You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/27 05:30:33 UTC
[flink-table-store] branch master updated: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 981b8e3 [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
981b8e3 is described below
commit 981b8e36984803eb269e32a5f3116da8ce4185dc
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jun 27 13:30:28 2022 +0800
[FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
This closes #171
---
.../table/store/connector/sink/StoreSinkTest.java | 8 +-
.../table/store/connector/sink/TestFileStore.java | 6 +-
.../source/TestChangelogDataReadWrite.java | 33 ++++---
.../{utils => memory}/HeapMemorySegmentPool.java | 6 +-
.../flink/table/store/file/memory/MemoryOwner.java | 34 +++++++
.../table/store/file/memory/MemoryPoolFactory.java | 99 +++++++++++++++++++
.../flink/table/store/file/mergetree/MemTable.java | 3 +
.../store/file/mergetree/MergeTreeOptions.java | 10 +-
.../store/file/mergetree/MergeTreeWriter.java | 34 +++++--
.../store/file/mergetree/SortBufferMemTable.java | 11 ++-
.../file/operation/KeyValueFileStoreWrite.java | 10 +-
.../table/ChangelogValueCountFileStoreTable.java | 5 +-
.../table/ChangelogWithKeyFileStoreTable.java | 4 +-
.../table/store/table/sink/AbstractTableWrite.java | 31 +++---
.../table/store/table/sink/MemoryTableWrite.java | 82 ++++++++++++++++
.../flink/table/store/file/TestFileStore.java | 37 ++++---
.../table/store/file/mergetree/MergeTreeTest.java | 25 ++---
.../file/mergetree/SortBufferMemTableTestBase.java | 4 +-
.../store/file/operation/TestCommitThread.java | 16 ++-
.../table/store/table/WritePreemptMemoryTest.java | 109 +++++++++++++++++++++
20 files changed, 466 insertions(+), 101 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 1ddbad7..d7f8666 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -234,11 +234,9 @@ public class StoreSinkTest {
writers.forEach(
(part, map) ->
map.forEach(
- (bucket, recordWriter) -> {
- TestRecordWriter testWriter = (TestRecordWriter) recordWriter;
- assertThat(testWriter.synced).isTrue();
- assertThat(testWriter.closed).isTrue();
- }));
+ (bucket, recordWriter) ->
+ assertThat(((TestRecordWriter) recordWriter).closed)
+ .isTrue()));
return committables;
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index d048eff..aa8be1f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -146,8 +146,6 @@ public class TestFileStore implements FileStore<KeyValue> {
final List<String> records = new ArrayList<>();
final boolean hasPk;
- boolean synced = false;
-
boolean closed = false;
TestRecordWriter(boolean hasPk) {
@@ -210,9 +208,7 @@ public class TestFileStore implements FileStore<KeyValue> {
}
@Override
- public void sync() {
- synced = true;
- }
+ public void sync() {}
@Override
public List<DataFileMeta> close() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index d4d26f5..1f32f9b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
@@ -149,18 +151,23 @@ public class TestChangelogDataReadWrite {
public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
MergeTreeOptions options = new MergeTreeOptions(new Configuration());
- return new KeyValueFileStoreWrite(
- new SchemaManager(tablePath),
- 0,
- KEY_TYPE,
- VALUE_TYPE,
- () -> COMPARATOR,
- new DeduplicateMergeFunction(),
- avro,
- pathFactory,
- snapshotManager,
- null, // not used, we only create an empty writer
- options)
- .createEmptyWriter(partition, bucket, service);
+ RecordWriter<KeyValue> writer =
+ new KeyValueFileStoreWrite(
+ new SchemaManager(tablePath),
+ 0,
+ KEY_TYPE,
+ VALUE_TYPE,
+ () -> COMPARATOR,
+ new DeduplicateMergeFunction(),
+ avro,
+ pathFactory,
+ snapshotManager,
+ null, // not used, we only create an empty writer
+ options)
+ .createEmptyWriter(partition, bucket, service);
+ ((MemoryOwner) writer)
+ .setMemoryPool(
+ new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+ return writer;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
similarity index 92%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
index 6174bc3..0dc48e7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.memory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -34,9 +34,9 @@ public class HeapMemorySegmentPool implements MemorySegmentPool {
private int numPage;
- public HeapMemorySegmentPool(int maxPages, int pageSize) {
+ public HeapMemorySegmentPool(long maxMemory, int pageSize) {
this.segments = new LinkedList<>();
- this.maxPages = maxPages;
+ this.maxPages = (int) (maxMemory / pageSize);
this.pageSize = pageSize;
this.numPage = 0;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java
new file mode 100644
index 0000000..4a35230
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.memory;
+
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+
+/** A class provides memory related methods. */
+public interface MemoryOwner {
+
+ /** Set {@link MemorySegmentPool} for the owner. */
+ void setMemoryPool(MemorySegmentPool memoryPool);
+
+ /** Memory occupancy size of this owner. */
+ long memoryOccupancy();
+
+ /** Flush memory of owner, release memory. */
+ void flushMemory() throws Exception;
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
new file mode 100644
index 0000000..2e06a9e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.memory;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+
+import java.util.List;
+
+/**
+ * A factory which creates {@link MemorySegmentPool} from {@link MemoryOwner}. The returned memory
+ * pool will try to preempt memory when there is no memory left.
+ */
+public class MemoryPoolFactory {
+
+ private final MemorySegmentPool innerPool;
+ private final Iterable<MemoryOwner> owners;
+
+ public MemoryPoolFactory(MemorySegmentPool innerPool, Iterable<MemoryOwner> owners) {
+ this.innerPool = innerPool;
+ this.owners = owners;
+ }
+
+ public void notifyNewOwner(MemoryOwner owner) {
+ MemorySegmentPool memoryPool = new OwnerMemoryPool(owner);
+ owner.setMemoryPool(memoryPool);
+ }
+
+ private void preemptMemory(MemoryOwner owner) {
+ long maxMemory = -1;
+ MemoryOwner max = null;
+ for (MemoryOwner other : owners) {
+ // Don't preempt yourself! Write and flush at the same time, which may lead to
+ // inconsistent state
+ if (other != owner && other.memoryOccupancy() > maxMemory) {
+ maxMemory = other.memoryOccupancy();
+ max = other;
+ }
+ }
+
+ if (max != null) {
+ try {
+ max.flushMemory();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private class OwnerMemoryPool implements MemorySegmentPool {
+
+ private final MemoryOwner owner;
+
+ public OwnerMemoryPool(MemoryOwner owner) {
+ this.owner = owner;
+ }
+
+ @Override
+ public int pageSize() {
+ return innerPool.pageSize();
+ }
+
+ @Override
+ public void returnAll(List<MemorySegment> memory) {
+ innerPool.returnAll(memory);
+ }
+
+ @Override
+ public int freePages() {
+ return innerPool.freePages();
+ }
+
+ @Override
+ public MemorySegment nextSegment() {
+ MemorySegment segment = innerPool.nextSegment();
+ if (segment == null) {
+ preemptMemory(owner);
+ return innerPool.nextSegment();
+ }
+ return segment;
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index e9ef179..944516c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -44,6 +44,9 @@ public interface MemTable {
/** Record size of this table. */
int size();
+ /** Memory occupancy size of this table. */
+ long memoryOccupancy();
+
/**
* Returns an iterator over the records in this table. The elements are returned in the order of
* key and sequence number and elements with the same key will be merged by the given {@link
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
index 8cadbfb..9c7a5e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
@@ -32,14 +32,14 @@ public class MergeTreeOptions {
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
ConfigOptions.key("write-buffer-size")
.memoryType()
- .defaultValue(MemorySize.parse("128 mb"))
+ .defaultValue(MemorySize.parse("256 mb"))
.withDescription(
"Amount of data to build up in memory before converting to a sorted on-disk file.");
public static final ConfigOption<MemorySize> PAGE_SIZE =
ConfigOptions.key("page-size")
.memoryType()
- .defaultValue(MemorySize.parse("1 mb"))
+ .defaultValue(MemorySize.parse("64 kb"))
.withDescription("Memory page size.");
public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
public final long writeBufferSize;
- public final long pageSize;
+ public final int pageSize;
public final long targetFileSize;
@@ -113,7 +113,7 @@ public class MergeTreeOptions {
public MergeTreeOptions(
long writeBufferSize,
- long pageSize,
+ int pageSize,
long targetFileSize,
int numSortedRunCompactionTrigger,
int numSortedRunStopTrigger,
@@ -138,7 +138,7 @@ public class MergeTreeOptions {
public MergeTreeOptions(ReadableConfig config) {
this(
config.get(WRITE_BUFFER_SIZE).getBytes(),
- config.get(PAGE_SIZE).getBytes(),
+ (int) config.get(PAGE_SIZE).getBytes(),
config.get(TARGET_FILE_SIZE).getBytes(),
config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6dd1d04..319a1b8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -20,13 +20,16 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
@@ -40,9 +43,11 @@ import java.util.Set;
import java.util.stream.Collectors;
/** A {@link RecordWriter} to write records and generate {@link Increment}. */
-public class MergeTreeWriter implements RecordWriter<KeyValue> {
+public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
- private final MemTable memTable;
+ private final RowType keyType;
+
+ private final RowType valueType;
private final CompactManager compactManager;
@@ -66,8 +71,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
private long newSequenceNumber;
+ private MemTable memTable;
+
public MergeTreeWriter(
- MemTable memTable,
+ RowType keyType,
+ RowType valueType,
CompactManager compactManager,
Levels levels,
long maxSequenceNumber,
@@ -76,7 +84,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
DataFileWriter dataFileWriter,
boolean commitForceCompact,
int numSortedRunStopTrigger) {
- this.memTable = memTable;
+ this.keyType = keyType;
+ this.valueType = valueType;
this.compactManager = compactManager;
this.levels = levels;
this.newSequenceNumber = maxSequenceNumber + 1;
@@ -99,12 +108,17 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
return levels;
}
+ @Override
+ public void setMemoryPool(MemorySegmentPool memoryPool) {
+ this.memTable = new SortBufferMemTable(keyType, valueType, memoryPool);
+ }
+
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
boolean success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
- flush();
+ flushMemory();
success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
@@ -112,7 +126,13 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
}
}
- private void flush() throws Exception {
+ @Override
+ public long memoryOccupancy() {
+ return memTable.memoryOccupancy();
+ }
+
+ @Override
+ public void flushMemory() throws Exception {
if (memTable.size() > 0) {
if (levels.numberOfSortedRuns() > numSortedRunStopTrigger) {
// stop writing, wait for compaction finished
@@ -130,7 +150,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
@Override
public Increment prepareCommit() throws Exception {
- flush();
+ flushMemory();
if (commitForceCompact) {
finishCompaction(true);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 363f69f..9258d2b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -27,12 +27,12 @@ import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -52,7 +52,7 @@ public class SortBufferMemTable implements MemTable {
private final KeyValueSerializer serializer;
private final BinaryInMemorySortBuffer buffer;
- public SortBufferMemTable(RowType keyType, RowType valueType, long maxMemSize, long pageSize) {
+ public SortBufferMemTable(RowType keyType, RowType valueType, MemorySegmentPool memoryPool) {
this.keyType = keyType;
this.valueType = valueType;
this.serializer = new KeyValueSerializer(keyType, valueType);
@@ -69,8 +69,6 @@ public class SortBufferMemTable implements MemTable {
CodeGenUtils.newRecordComparator(
new TableConfig(), sortKeyTypes, "MemTableComparator");
- HeapMemorySegmentPool memoryPool =
- new HeapMemorySegmentPool((int) (maxMemSize / pageSize), (int) pageSize);
if (memoryPool.freePages() < 3) {
throw new IllegalArgumentException(
"Write buffer requires a minimum of 3 page memory, please increase write buffer memory size.");
@@ -95,6 +93,11 @@ public class SortBufferMemTable implements MemTable {
return buffer.size();
}
+ @Override
+ public long memoryOccupancy() {
+ return buffer.getOccupancy();
+ }
+
@Override
public Iterator<KeyValue> iterator(
Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 174fe25..c546a5d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
@@ -119,7 +118,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
return new CompactTask(keyComparator, options.targetFileSize, rewriter, unit, true);
}
- private RecordWriter<KeyValue> createMergeTreeWriter(
+ private MergeTreeWriter createMergeTreeWriter(
BinaryRowData partition,
int bucket,
List<DataFileMeta> restoreFiles,
@@ -127,11 +126,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
return new MergeTreeWriter(
- new SortBufferMemTable(
- dataFileWriter.keyType(),
- dataFileWriter.valueType(),
- options.writeBufferSize,
- options.pageSize),
+ dataFileWriter.keyType(),
+ dataFileWriter.valueType(),
createCompactManager(
partition,
bucket,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 299d4a1..842258f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
+import org.apache.flink.table.store.table.sink.MemoryTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
@@ -128,7 +128,8 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), tableSchema);
- return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
+ return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
+
@Override
protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 8b7f95c..337c3c9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -36,7 +36,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
+import org.apache.flink.table.store.table.sink.MemoryTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
@@ -177,7 +177,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), tableSchema);
- return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
+ return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
@Override
protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 1e45320..8ba3bfc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -43,7 +43,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
private final FileStoreWrite<T> write;
private final SinkRecordConverter recordConverter;
- private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
+ protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
private final ExecutorService compactExecutor;
private boolean overwrite = false;
@@ -100,7 +100,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
// we need a mechanism to clear writers, otherwise there will be more and more
// such as yesterday's partition that no longer needs to be written.
if (committable.increment().newFiles().isEmpty()) {
- closeWriter(writer);
+ writer.close();
bucketIter.remove();
}
}
@@ -113,20 +113,15 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
return result;
}
- private void closeWriter(RecordWriter<T> writer) throws Exception {
- writer.sync();
- writer.close();
- }
-
@Override
public void close() throws Exception {
- compactExecutor.shutdownNow();
for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
for (RecordWriter<T> writer : bucketWriters.values()) {
- closeWriter(writer);
+ writer.close();
}
}
writers.clear();
+ compactExecutor.shutdownNow();
}
@VisibleForTesting
@@ -143,11 +138,17 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
buckets = new HashMap<>();
writers.put(partition.copy(), buckets);
}
- return buckets.computeIfAbsent(
- bucket,
- k ->
- overwrite
- ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
- : write.createWriter(partition.copy(), bucket, compactExecutor));
+ return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket));
}
+
+ private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) {
+ RecordWriter<T> writer =
+ overwrite
+ ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
+ : write.createWriter(partition.copy(), bucket, compactExecutor);
+ notifyNewWriter(writer);
+ return writer;
+ }
+
+ protected void notifyNewWriter(RecordWriter<T> writer) {}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
new file mode 100644
index 0000000..215840a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
+import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A {@link TableWrite} which supports using shared memory and preempting memory from other writers.
+ */
+public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
+
+ private final MemoryPoolFactory memoryPoolFactory;
+
+ protected MemoryTableWrite(
+ FileStoreWrite<T> write,
+ SinkRecordConverter recordConverter,
+ FileStoreOptions options) {
+ super(write, recordConverter);
+
+ MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
+ HeapMemorySegmentPool memoryPool =
+ new HeapMemorySegmentPool(
+ mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+ this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
+ }
+
+ private Iterator<MemoryOwner> memoryOwners() {
+ Iterator<Map<Integer, RecordWriter<T>>> iterator = writers.values().iterator();
+ return Iterators.concat(
+ new Iterator<Iterator<MemoryOwner>>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Iterator<MemoryOwner> next() {
+ return Iterators.transform(
+ iterator.next().values().iterator(),
+ writer -> (MemoryOwner) writer);
+ }
+ });
+ }
+
+ @Override
+ protected void notifyNewWriter(RecordWriter<T> writer) {
+ if (!(writer instanceof MemoryOwner)) {
+ throw new RuntimeException(
+ "Should create a MemoryOwner for MemoryTableWrite,"
+ + " but this is: "
+ + writer.getClass());
+ }
+ memoryPoolFactory.notifyNewOwner((MemoryOwner) writer);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 655b38d..a2f0a5d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -42,7 +44,6 @@ import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.function.QuadFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,10 @@ public class TestFileStore extends KeyValueFileStore {
private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
+ public static final MemorySize WRITE_BUFFER_SIZE = MemorySize.parse("16 kb");
+
+ public static final MemorySize PAGE_SIZE = MemorySize.parse("4 kb");
+
private final String root;
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;
@@ -87,8 +92,8 @@ public class TestFileStore extends KeyValueFileStore {
MergeFunction mergeFunction) {
Configuration conf = new Configuration();
- conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
- conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
+ conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
+ conf.set(MergeTreeOptions.PAGE_SIZE, PAGE_SIZE);
conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
conf.set(
@@ -155,7 +160,7 @@ public class TestFileStore extends KeyValueFileStore {
kvs,
partitionCalculator,
bucketCalculator,
- FileStoreWrite::createWriter,
+ false,
(commit, committable) -> {
logOffsets.forEach(committable::addLogOffset);
commit.commit(committable, Collections.emptyMap());
@@ -172,7 +177,7 @@ public class TestFileStore extends KeyValueFileStore {
kvs,
partitionCalculator,
bucketCalculator,
- FileStoreWrite::createEmptyWriter,
+ true,
(commit, committable) ->
commit.overwrite(partition, committable, Collections.emptyMap()));
}
@@ -181,13 +186,7 @@ public class TestFileStore extends KeyValueFileStore {
List<KeyValue> kvs,
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
- QuadFunction<
- FileStoreWrite<KeyValue>,
- BinaryRowData,
- Integer,
- ExecutorService,
- RecordWriter<KeyValue>>
- createWriterFunction,
+ boolean emptyWriter,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
FileStoreWrite<KeyValue> write = newWrite();
@@ -201,8 +200,18 @@ public class TestFileStore extends KeyValueFileStore {
(b, w) -> {
if (w == null) {
ExecutorService service = Executors.newSingleThreadExecutor();
- return createWriterFunction.apply(
- write, partition, bucket, service);
+ RecordWriter<KeyValue> writer =
+ emptyWriter
+ ? write.createEmptyWriter(
+ partition, bucket, service)
+ : write.createWriter(
+ partition, bucket, service);
+ ((MemoryOwner) writer)
+ .setMemoryPool(
+ new HeapMemorySegmentPool(
+ WRITE_BUFFER_SIZE.getBytes(),
+ (int) PAGE_SIZE.getBytes()));
+ return writer;
} else {
return w;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index e7607a1..72f5837 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
@@ -264,20 +265,20 @@ public class MergeTreeTest {
private MergeTreeWriter createMergeTreeWriter(List<DataFileMeta> files) {
long maxSequenceNumber =
files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
- return new MergeTreeWriter(
- new SortBufferMemTable(
+ MergeTreeWriter writer =
+ new MergeTreeWriter(
dataFileWriter.keyType(),
dataFileWriter.valueType(),
- options.writeBufferSize,
- options.pageSize),
- createCompactManager(dataFileWriter, service),
- new Levels(comparator, files, options.numLevels),
- maxSequenceNumber,
- comparator,
- new DeduplicateMergeFunction(),
- dataFileWriter,
- options.commitForceCompact,
- options.numSortedRunStopTrigger);
+ createCompactManager(dataFileWriter, service),
+ new Levels(comparator, files, options.numLevels),
+ maxSequenceNumber,
+ comparator,
+ new DeduplicateMergeFunction(),
+ dataFileWriter,
+ options.commitForceCompact,
+ options.numSortedRunStopTrigger);
+ writer.setMemoryPool(new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+ return writer;
}
private CompactManager createCompactManager(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
index c92fff2..3666b66 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionTestUtils;
@@ -54,8 +55,7 @@ public abstract class SortBufferMemTableTestBase {
new RowType(
Collections.singletonList(
new RowType.RowField("value", new BigIntType()))),
- 32 * 1024 * 3L,
- 32 * 1024);
+ new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024));
protected abstract boolean addOnly();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index d51e441..3c92759 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.table.store.file.TestFileStore.PAGE_SIZE;
+import static org.apache.flink.table.store.file.TestFileStore.WRITE_BUFFER_SIZE;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
/** Testing {@link Thread}s to perform concurrent commits. */
@@ -197,10 +200,13 @@ public class TestCommitThread extends Thread {
t.setName(Thread.currentThread().getName() + "-writer-service-pool");
return t;
});
- if (empty) {
- return (MergeTreeWriter) write.createEmptyWriter(partition, 0, service);
- } else {
- return (MergeTreeWriter) write.createWriter(partition, 0, service);
- }
+ MergeTreeWriter writer =
+ empty
+ ? (MergeTreeWriter) write.createEmptyWriter(partition, 0, service)
+ : (MergeTreeWriter) write.createWriter(partition, 0, service);
+ writer.setMemoryPool(
+ new HeapMemorySegmentPool(
+ WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
+ return writer;
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
new file mode 100644
index 0000000..0e81bae
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+public class WritePreemptMemoryTest extends FileStoreTableTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void writeMultiplePartitions() throws Exception {
+ testWritePreemptMemory(false);
+ }
+
+ @Test
+ public void writeSinglePartition() throws Exception {
+ testWritePreemptMemory(true);
+ }
+
+ private void testWritePreemptMemory(boolean singlePartition) throws Exception {
+ // write
+ FileStoreTable table = createFileStoreTable();
+ TableWrite write = table.newWrite();
+ Random random = new Random();
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10_000; i++) {
+ GenericRowData row =
+ GenericRowData.of(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
+ write.write(row);
+ expected.add(BATCH_ROW_TO_STRING.apply(row));
+ }
+ List<FileCommittable> committables = write.prepareCommit();
+ table.newCommit().commit("0", committables);
+ write.close();
+
+ // read
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead();
+ List<String> results = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ results.addAll(getResult(read, splits, binaryRow(i), 0, BATCH_ROW_TO_STRING));
+ }
+ assertThat(results).containsExactlyInAnyOrder(expected.toArray(new String[0]));
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Path tablePath = new Path(tempDir.toString());
+ Configuration conf = new Configuration();
+ conf.set(FileStoreOptions.PATH, tablePath.toString());
+ conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
+ conf.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(1024));
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ TableSchema schema =
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ conf.toMap(),
+ ""));
+ return new ChangelogWithKeyFileStoreTable(
+ tablePath.getName(), schemaManager, schema, "user");
+ }
+}