You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/22 02:53:31 UTC
[flink-table-store] branch master updated: [FLINK-30082] Enable write-buffer-spillable by default only for object storage
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 039f2ff5 [FLINK-30082] Enable write-buffer-spillable by default only for object storage
039f2ff5 is described below
commit 039f2ff5d2e25154b5ab4b04f4c36086ce51d9c3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Nov 22 10:53:26 2022 +0800
[FLINK-30082] Enable write-buffer-spillable by default only for object storage
This closes #389
---
.../layouts/shortcodes/generated/core_configuration.html | 4 ++--
.../java/org/apache/flink/table/store/CoreOptions.java | 9 +++++----
.../store/file/operation/KeyValueFileStoreWrite.java | 16 +++++++++++++++-
3 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 4a1a6b26..e6efa021 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -250,9 +250,9 @@
</tr>
<tr>
<td><h5>write-buffer-spillable</h5></td>
- <td style="word-wrap: break-word;">true</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
- <td>Whether the write buffer can be spillable.</td>
+ <td>Whether the write buffer can be spillable. Enabled by default when using object storage.</td>
</tr>
<tr>
<td><h5>write-mode</h5></td>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index f54e8461..8e4d65dd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -192,8 +192,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
ConfigOptions.key("write-buffer-spillable")
.booleanType()
- .defaultValue(true)
- .withDescription("Whether the write buffer can be spillable.");
+ .noDefaultValue()
+ .withDescription(
+ "Whether the write buffer can be spillable. Enabled by default when using object storage.");
public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
ConfigOptions.key("local-sort.max-num-file-handles")
@@ -459,8 +460,8 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}
- public boolean writeBufferSpillable() {
- return options.get(WRITE_BUFFER_SPILLABLE);
+ public boolean writeBufferSpillable(boolean usingObjectStore) {
+ return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
}
public int localSortMaxNumFileHandles() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 1cbef1b9..e06ea0c3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
@@ -45,6 +46,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -63,6 +66,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
private final MergeFunction<KeyValue> mergeFunction;
private final CoreOptions options;
+ private final FileStorePathFactory pathFactory;
public KeyValueFileStoreWrite(
SchemaManager schemaManager,
@@ -96,6 +100,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
this.keyComparatorSupplier = keyComparatorSupplier;
this.mergeFunction = mergeFunction;
this.options = options;
+ this.pathFactory = pathFactory;
}
@Override
@@ -140,7 +145,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
compactExecutor,
levels);
return new MergeTreeWriter(
- options.writeBufferSpillable(),
+ bufferSpillable(),
options.localSortMaxNumFileHandles(),
ioManager,
compactManager,
@@ -152,6 +157,15 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
options.changelogProducer());
}
+ private boolean bufferSpillable() {
+ try {
+ return options.writeBufferSpillable(
+ pathFactory.root().getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private CompactManager createCompactManager(
BinaryRowData partition,
int bucket,