You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/22 02:53:31 UTC

[flink-table-store] branch master updated: [FLINK-30082] Enable write-buffer-spillable by default only for object storage

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 039f2ff5 [FLINK-30082] Enable write-buffer-spillable by default only for object storage
039f2ff5 is described below

commit 039f2ff5d2e25154b5ab4b04f4c36086ce51d9c3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Nov 22 10:53:26 2022 +0800

    [FLINK-30082] Enable write-buffer-spillable by default only for object storage
    
    This closes #389
---
 .../layouts/shortcodes/generated/core_configuration.html |  4 ++--
 .../java/org/apache/flink/table/store/CoreOptions.java   |  9 +++++----
 .../store/file/operation/KeyValueFileStoreWrite.java     | 16 +++++++++++++++-
 3 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 4a1a6b26..e6efa021 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -250,9 +250,9 @@
         </tr>
         <tr>
             <td><h5>write-buffer-spillable</h5></td>
-            <td style="word-wrap: break-word;">true</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>Boolean</td>
-            <td>Whether the write buffer can be spillable.</td>
+            <td>Whether the write buffer can be spillable. Enabled by default when using object storage.</td>
         </tr>
         <tr>
             <td><h5>write-mode</h5></td>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index f54e8461..8e4d65dd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -192,8 +192,9 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
             ConfigOptions.key("write-buffer-spillable")
                     .booleanType()
-                    .defaultValue(true)
-                    .withDescription("Whether the write buffer can be spillable.");
+                    .noDefaultValue()
+                    .withDescription(
+                            "Whether the write buffer can be spillable. Enabled by default when using object storage.");
 
     public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
             ConfigOptions.key("local-sort.max-num-file-handles")
@@ -459,8 +460,8 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_BUFFER_SIZE).getBytes();
     }
 
-    public boolean writeBufferSpillable() {
-        return options.get(WRITE_BUFFER_SPILLABLE);
+    public boolean writeBufferSpillable(boolean usingObjectStore) {
+        return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
     }
 
     public int localSortMaxNumFileHandles() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 1cbef1b9..e06ea0c3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.CoreOptions;
@@ -45,6 +46,8 @@ import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -63,6 +66,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction<KeyValue> mergeFunction;
     private final CoreOptions options;
+    private final FileStorePathFactory pathFactory;
 
     public KeyValueFileStoreWrite(
             SchemaManager schemaManager,
@@ -96,6 +100,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.mergeFunction = mergeFunction;
         this.options = options;
+        this.pathFactory = pathFactory;
     }
 
     @Override
@@ -140,7 +145,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                         compactExecutor,
                         levels);
         return new MergeTreeWriter(
-                options.writeBufferSpillable(),
+                bufferSpillable(),
                 options.localSortMaxNumFileHandles(),
                 ioManager,
                 compactManager,
@@ -152,6 +157,15 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                 options.changelogProducer());
     }
 
+    private boolean bufferSpillable() {
+        try {
+            return options.writeBufferSpillable(
+                    pathFactory.root().getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     private CompactManager createCompactManager(
             BinaryRowData partition,
             int bucket,