You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/06 08:17:10 UTC
[flink-table-store] branch release-0.3 updated: [FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 1ba8f372 [FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long
1ba8f372 is described below
commit 1ba8f372a05d64e7370f7dcbb65b77e23079f8ef
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jan 6 16:06:50 2023 +0800
[FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long
This closes #460
(cherry picked from commit 8fdbbd4e8c22a82d79509cab0add4a6aa7331672)
---
.../table/store/connector/sink/CompactorSink.java | 2 +-
.../store/connector/sink/CompactorSinkBuilder.java | 8 +-
.../sink/FullChangelogStoreSinkWrite.java | 122 ++++++++++++--------
...ava => OffsetRowDataHashStreamPartitioner.java} | 23 ++--
.../store/connector/sink/StoreCompactOperator.java | 30 ++++-
.../table/store/connector/sink/StoreSinkWrite.java | 4 +
.../store/connector/sink/StoreSinkWriteImpl.java | 20 ++++
.../connector/source/CompactorSourceBuilder.java | 16 ++-
.../ChangelogWithKeyFileStoreTableITCase.java | 123 +++++++++++++++++----
.../connector/action/CompactActionITCase.java | 43 +++----
.../store/connector/sink/CompactorSinkITCase.java | 41 +++++--
.../connector/source/CompactorSourceITCase.java | 120 ++++++++++----------
.../source/FileStoreSourceSplitGeneratorTest.java | 1 +
.../source/FileStoreSourceSplitSerializerTest.java | 2 +-
.../source/TestChangelogDataReadWrite.java | 3 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../table/store/file/append/AppendOnlyWriter.java | 5 +
.../store/file/mergetree/MergeTreeWriter.java | 5 +
.../file/mergetree/compact/CompactStrategy.java | 7 +-
.../file/operation/AbstractFileStoreWrite.java | 102 ++++++++++-------
.../file/operation/AppendOnlyFileStoreWrite.java | 27 ++++-
.../table/store/file/operation/FileStoreWrite.java | 15 +++
.../file/operation/KeyValueFileStoreWrite.java | 20 +++-
.../store/file/operation/MemoryFileStoreWrite.java | 8 +-
.../table/store/file/utils/ObjectSerializer.java | 17 +++
.../flink/table/store/file/utils/RecordWriter.java | 10 ++
.../flink/table/store/table/sink/TableWrite.java | 12 ++
.../table/store/table/sink/TableWriteImpl.java | 7 ++
.../store/table/source/AbstractDataTableScan.java | 28 +++--
.../flink/table/store/table/source/DataSplit.java | 28 ++++-
.../ContinuousCompactorFollowUpScanner.java | 5 +-
.../table/store/table/system/BucketsTable.java | 48 ++++++--
.../flink/table/store/file/TestFileStore.java | 11 +-
.../file/operation/KeyValueFileStoreReadTest.java | 5 +-
.../store/file/operation/TestCommitThread.java | 6 +-
.../flink/table/store/table/source/SplitTest.java | 8 +-
.../ContinuousCompactorFollowUpScannerTest.java | 32 ++++--
.../ContinuousCompactorStartingScannerTest.java | 4 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../table/store/tests/FlinkActionsE2eTest.java | 2 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../store/mapred/TableStoreInputSplitTest.java | 1 +
.../src/test/resources/log4j2-test.properties | 2 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../src/test/resources/log4j2-test.properties | 2 +-
46 files changed, 691 insertions(+), 294 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
index 4de06f34..c234858f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.function.SerializableFunction;
-/** {@link FlinkSink} for stand-alone compact jobs. */
+/** {@link FlinkSink} for dedicated compact jobs. */
public class CompactorSink extends FlinkSink {
private static final long serialVersionUID = 1L;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
index aee210ae..c57063a2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
@@ -50,9 +50,11 @@ public class CompactorSinkBuilder {
}
public DataStreamSink<?> build() {
- HashRowStreamPartitioner partitioner =
- new HashRowStreamPartitioner(
- BucketsTable.rowType(table.schema().logicalPartitionType()));
+ OffsetRowDataHashStreamPartitioner partitioner =
+ new OffsetRowDataHashStreamPartitioner(
+ BucketsTable.partitionWithBucketRowType(
+ table.schema().logicalPartitionType()),
+ 1);
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(), partitioner);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 3d7b0395..1b12b677 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -38,15 +38,18 @@ import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
/**
* {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
@@ -54,6 +57,8 @@ import java.util.TreeMap;
*/
public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
+
private final long fullCompactionThresholdMs;
private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
@@ -64,7 +69,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
private final NavigableMap<Long, Long> firstWriteMs;
private final ListState<Tuple2<Long, Long>> firstWriteMsState;
- private Long snapshotIdentifierToCheck;
+ private transient TreeSet<Long> commitIdentifiersToCheck;
public FullChangelogStoreSinkWrite(
FileStoreTable table,
@@ -116,7 +121,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
firstWriteMs = new TreeMap<>();
firstWriteMsState.get().forEach(t -> firstWriteMs.put(t.f0, t.f1));
- snapshotIdentifierToCheck = null;
+ commitIdentifiersToCheck = new TreeSet<>();
}
@Override
@@ -134,6 +139,10 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
}
private void touchBucket(BinaryRowData partition, int bucket) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("touch partition {}, bucket {}", partition, bucket);
+ }
+
// partition is a reused BinaryRowData
// we first check if the tuple exists to minimize copying
if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
@@ -148,15 +157,9 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
@Override
public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException {
- if (snapshotIdentifierToCheck != null) {
- Optional<Snapshot> snapshot = findSnapshot(snapshotIdentifierToCheck);
- if (snapshot.map(s -> s.commitKind() == Snapshot.CommitKind.COMPACT).orElse(false)) {
- writtenBuckets.headMap(snapshotIdentifierToCheck, true).clear();
- firstWriteMs.headMap(snapshotIdentifierToCheck, true).clear();
- snapshotIdentifierToCheck = null;
- }
- }
+ checkSuccessfulFullCompaction();
+ // check what buckets we've modified during this checkpoint interval
if (!currentWrittenBuckets.isEmpty()) {
writtenBuckets
.computeIfAbsent(checkpointId, k -> new HashSet<>())
@@ -166,8 +169,18 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
currentFirstWriteMs = null;
}
- if (snapshotIdentifierToCheck == null // wait for last forced full compaction to complete
- && !writtenBuckets.isEmpty() // there should be something to compact
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> checkpointIdAndBuckets :
+ writtenBuckets.entrySet()) {
+ LOG.debug(
+ "Written buckets for checkpoint #{} are:", checkpointIdAndBuckets.getKey());
+ for (Tuple2<BinaryRowData, Integer> bucket : checkpointIdAndBuckets.getValue()) {
+ LOG.debug(" * partition {}, bucket {}", bucket.f0, bucket.f1);
+ }
+ }
+ }
+
+ if (!writtenBuckets.isEmpty() // there should be something to compact
&& System.currentTimeMillis() - firstWriteMs.firstEntry().getValue()
>= fullCompactionThresholdMs // time without full compaction exceeds
) {
@@ -175,58 +188,75 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
}
if (doCompaction) {
- snapshotIdentifierToCheck = checkpointId;
- Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
- for (Set<Tuple2<BinaryRowData, Integer>> buckets : writtenBuckets.values()) {
- for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
- if (compactedBuckets.contains(bucket)) {
- continue;
- }
- compactedBuckets.add(bucket);
- try {
- write.compact(bucket.f0, bucket.f1, true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit full compaction for checkpoint #{}", checkpointId);
}
+ submitFullCompaction();
+ commitIdentifiersToCheck.add(checkpointId);
}
return super.prepareCommit(doCompaction, checkpointId);
}
- private Optional<Snapshot> findSnapshot(long identifierToCheck) {
- // TODO We need a mechanism to do timeout recovery in case of snapshot expiration.
+ private void checkSuccessfulFullCompaction() {
SnapshotManager snapshotManager = table.snapshotManager();
Long latestId = snapshotManager.latestSnapshotId();
if (latestId == null) {
- return Optional.empty();
+ return;
}
-
Long earliestId = snapshotManager.earliestSnapshotId();
if (earliestId == null) {
- return Optional.empty();
+ return;
}
- // We must find the snapshot whose identifier is exactly `identifierToCheck`.
- // We can't just compare with the latest snapshot identifier by this user (like what we do
- // in `AbstractFileStoreWrite`), because even if the latest snapshot identifier is newer,
- // compact changes may still be discarded due to commit conflicts.
for (long id = latestId; id >= earliestId; id--) {
Snapshot snapshot = snapshotManager.snapshot(id);
- if (!snapshot.commitUser().equals(commitUser)) {
- continue;
- }
- if (snapshot.commitIdentifier() == identifierToCheck) {
- return Optional.of(snapshot);
- } else if (snapshot.commitIdentifier() < identifierToCheck) {
- // We're searching from new snapshots to old ones. So if we find an older
- // identifier, we can make sure our target snapshot will never occur.
- return Optional.empty();
+ if (snapshot.commitUser().equals(commitUser)
+ && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ long commitIdentifier = snapshot.commitIdentifier();
+ if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+ // We found a full compaction snapshot triggered by `submitFullCompaction`
+ // method.
+ //
+ // Because `submitFullCompaction` will compact all buckets in `writtenBuckets`,
+ // thus a successful commit indicates that all previous buckets have been
+ // compacted.
+ //
+ // We must make sure that the compact snapshot is triggered by
+ // `submitFullCompaction`, because normal compaction may also trigger full
+ // compaction, but that only compacts a specific bucket, not all buckets
+ // recorded in `writtenBuckets`.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Found full compaction snapshot #{} with identifier {}",
+ id,
+ commitIdentifier);
+ }
+ writtenBuckets.headMap(commitIdentifier, true).clear();
+ firstWriteMs.headMap(commitIdentifier, true).clear();
+ commitIdentifiersToCheck.headSet(commitIdentifier).clear();
+ break;
+ }
}
}
+ }
- return Optional.empty();
+ private void submitFullCompaction() {
+ Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
+ writtenBuckets.forEach(
+ (checkpointId, buckets) -> {
+ for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
+ if (compactedBuckets.contains(bucket)) {
+ continue;
+ }
+ compactedBuckets.add(bucket);
+ try {
+ write.compact(bucket.f0, bucket.f1, true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
similarity index 70%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
index f69443a0..d9ed04cd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
@@ -24,28 +24,37 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.utils.OffsetRowData;
import org.apache.flink.table.types.logical.RowType;
-/** {@link StreamPartitioner} to partition {@link RowData} according to its hash value. */
-public class HashRowStreamPartitioner extends StreamPartitioner<RowData> {
+/**
+ * {@link StreamPartitioner} to partition {@link RowData} according to its hash value from an {@link
+ * OffsetRowData}.
+ */
+public class OffsetRowDataHashStreamPartitioner extends StreamPartitioner<RowData> {
- private final RowType rowType;
+ private final RowType offsetRowType;
+ private final int offset;
+ private transient OffsetRowData offsetRowData;
private transient RowDataSerializer serializer;
- public HashRowStreamPartitioner(RowType rowType) {
- this.rowType = rowType;
+ public OffsetRowDataHashStreamPartitioner(RowType offsetRowType, int offset) {
+ this.offsetRowType = offsetRowType;
+ this.offset = offset;
}
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
- serializer = new RowDataSerializer(rowType);
+ this.offsetRowData = new OffsetRowData(offsetRowType.getFieldCount(), offset);
+ serializer = new RowDataSerializer(offsetRowType);
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
- int hash = serializer.toBinaryRow(record.getInstance().getValue()).hashCode();
+ RowData rowData = record.getInstance().getValue();
+ int hash = serializer.toBinaryRow(offsetRowData.replace(rowData)).hashCode();
return Math.abs(hash) % numberOfChannels;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 9ac2957a..fdab6d1f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.OffsetRowData;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.Preconditions;
@@ -47,6 +49,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
private transient StoreSinkWrite write;
private transient RowDataSerializer partitionSerializer;
private transient OffsetRowData reusedPartition;
+ private transient DataFileMetaSerializer dataFileMetaSerializer;
public StoreCompactOperator(
FileStoreTable table,
@@ -73,17 +76,34 @@ public class StoreCompactOperator extends PrepareCommitOperator {
public void open() throws Exception {
super.open();
partitionSerializer = new RowDataSerializer(table.schema().logicalPartitionType());
- reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 0);
+ reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 1);
+ dataFileMetaSerializer = new DataFileMetaSerializer();
}
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
- RowData partitionAndBucket = element.getValue();
- reusedPartition.replace(partitionAndBucket);
+ RowData record = element.getValue();
+
+ long snapshotId = record.getLong(0);
+
+ reusedPartition.replace(record);
BinaryRowData partition = partitionSerializer.toBinaryRow(reusedPartition).copy();
- int bucket = partitionAndBucket.getInt(partitionSerializer.getArity());
- write.compact(partition, bucket, !isStreaming);
+ int bucket = record.getInt(partitionSerializer.getArity() + 1);
+
+ byte[] serializedFiles = record.getBinary(partitionSerializer.getArity() + 2);
+ List<DataFileMeta> files = dataFileMetaSerializer.deserializeList(serializedFiles);
+
+ if (isStreaming) {
+ write.notifyNewFiles(snapshotId, partition, bucket, files);
+ write.compact(partition, bucket, false);
+ } else {
+ Preconditions.checkArgument(
+ files.isEmpty(),
+ "Batch compact job does not concern what files are compacted. "
+ + "They only need to know what buckets are compacted.");
+ write.compact(partition, bucket, true);
+ }
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
index fe3465d9..f44e0de4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -39,6 +40,9 @@ interface StoreSinkWrite {
void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+ void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException;
void snapshotState(StateSnapshotContext context) throws Exception;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
index 374616ec..75186dac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
@@ -23,11 +23,15 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +39,8 @@ import java.util.List;
/** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
public class StoreSinkWriteImpl implements StoreSinkWrite {
+ private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);
+
protected final FileStoreTable table;
protected final String commitUser;
protected final TableWrite write;
@@ -84,6 +90,20 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
write.compact(partition, bucket, fullCompaction);
}
+ @Override
+ public void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Receive {} new files from snapshot {}, partition {}, bucket {}",
+ files.size(),
+ snapshotId,
+ partition,
+ bucket);
+ }
+ write.notifyNewFiles(snapshotId, partition, bucket, files);
+ }
+
@Override
public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
index 649daa7e..43cbb3f0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
@@ -44,12 +44,12 @@ import java.util.Map;
/**
* Source builder to build a Flink {@link StaticFileStoreSource} or {@link
- * ContinuousFileStoreSource}. This is for stand-alone compactor jobs.
+ * ContinuousFileStoreSource}. This is for dedicated compactor jobs.
*/
public class CompactorSourceBuilder {
private final String tableIdentifier;
- private final BucketsTable bucketsTable;
+ private final FileStoreTable table;
private boolean isContinuous = false;
private StreamExecutionEnvironment env;
@@ -57,7 +57,7 @@ public class CompactorSourceBuilder {
public CompactorSourceBuilder(String tableIdentifier, FileStoreTable table) {
this.tableIdentifier = tableIdentifier;
- this.bucketsTable = new BucketsTable(table);
+ this.table = table;
}
public CompactorSourceBuilder withContinuousMode(boolean isContinuous) {
@@ -79,13 +79,16 @@ public class CompactorSourceBuilder {
return this;
}
- private Source<RowData, ?, ?> buildSource() {
+ private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
Predicate partitionPredicate = null;
if (specifiedPartitions != null) {
+ // This predicate is based on the row type of the original table, not bucket table.
+ // Because TableScan in BucketsTable is the same with FileStoreTable,
+ // and partition filter is done by scan.
partitionPredicate =
PredicateBuilder.or(
specifiedPartitions.stream()
- .map(p -> PredicateConverter.fromMap(p, bucketsTable.rowType()))
+ .map(p -> PredicateConverter.fromMap(p, table.rowType()))
.toArray(Predicate[]::new));
}
@@ -122,9 +125,10 @@ public class CompactorSourceBuilder {
throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
}
+ BucketsTable bucketsTable = new BucketsTable(table, isContinuous);
LogicalType produceType = bucketsTable.rowType();
return env.fromSource(
- buildSource(),
+ buildSource(bucketsTable),
WatermarkStrategy.noWatermarks(),
tableIdentifier + "-compact-source",
InternalTypeInfo.of(produceType));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index b759040e..21a73a05 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -105,24 +108,21 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
return TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
}
- private TableEnvironment createStreamingTableEnvironment() {
+ private TableEnvironment createStreamingTableEnvironment(int checkpointIntervalMs) {
TableEnvironment sEnv =
TableEnvironment.create(
EnvironmentSettings.newInstance().inStreamingMode().build());
// set checkpoint interval to a random number to emulate different speed of commit
sEnv.getConfig()
.getConfiguration()
- .set(
- CHECKPOINTING_INTERVAL,
- Duration.ofMillis(ThreadLocalRandom.current().nextInt(900) + 100));
+ .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(checkpointIntervalMs));
return sEnv;
}
- private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+ private StreamExecutionEnvironment createStreamExecutionEnvironment(int checkpointIntervalMs) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig()
- .setCheckpointInterval(ThreadLocalRandom.current().nextInt(900) + 100);
+ env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
return env;
}
@@ -132,7 +132,8 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
@Test
public void testFullCompactionTriggerInterval() throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment();
+ TableEnvironment sEnv =
+ createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -206,40 +207,103 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
Assert.assertEquals(expected, actual);
}
+ @Test
+ public void testFullCompactionWithLongCheckpointInterval() throws Exception {
+ // create table
+ TableEnvironment bEnv = createBatchTableEnvironment();
+ bEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ bEnv.executeSql(
+ String.format(
+ "CREATE CATALOG testCatalog WITH ('type'='table-store', 'warehouse'='%s')",
+ path));
+ bEnv.executeSql("USE CATALOG testCatalog");
+ bEnv.executeSql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v INT,"
+ + " PRIMARY KEY (k) NOT ENFORCED"
+ + ") WITH ("
+ + " 'bucket' = '1',"
+ + " 'changelog-producer' = 'full-compaction',"
+ + " 'changelog-producer.compaction-interval' = '2s',"
+ + " 'write.compaction-skip' = 'true'"
+ + ")");
+
+ // run select job
+ TableEnvironment sEnv = createStreamingTableEnvironment(100);
+ sEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ sEnv.executeSql(
+ String.format(
+ "CREATE CATALOG testCatalog WITH ('type'='table-store', 'warehouse'='%s')",
+ path));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
+
+ // run compact job
+ StreamExecutionEnvironment env = createStreamExecutionEnvironment(2000);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
+ JobClient client = env.executeAsync();
+
+ // write records for a while
+ long startMs = System.currentTimeMillis();
+ int currentKey = 0;
+ while (System.currentTimeMillis() - startMs <= 10000) {
+ currentKey++;
+ bEnv.executeSql(
+ String.format(
+ "INSERT INTO T VALUES (%d, %d)", currentKey, currentKey * 100))
+ .await();
+ }
+
+ Assert.assertEquals(JobStatus.RUNNING, client.getJobStatus().get());
+
+ for (int i = 1; i <= currentKey; i++) {
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals(String.format("+I[%d, %d]", i, i * 100), it.next().toString());
+ }
+ it.close();
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
- @Test
+ @Test(timeout = 600000)
public void testNoChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = createBatchTableEnvironment();
testNoChangelogProducerRandom(bEnv, 1, false);
}
- @Test
+ @Test(timeout = 600000)
public void testNoChangelogProducerStreamingRandom() throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment();
ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
}
- @Test
+ @Test(timeout = 600000)
public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = createBatchTableEnvironment();
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}
- @Test
+ @Test(timeout = 600000)
public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment();
ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
}
- @Test
+ @Test(timeout = 600000)
public void testStandAloneFullCompactJobRandom() throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment();
ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
}
@@ -259,38 +323,49 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
private void testFullCompactionChangelogProducerRandom(
TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
testRandom(
tEnv,
numProducers,
enableFailure,
"'bucket' = '4',"
+ + String.format(
+ "'write-buffer-size' = '%s',",
+ random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'full-compaction',"
+ "'changelog-producer.compaction-interval' = '1s'");
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records correctly
- Thread.sleep(ThreadLocalRandom.current().nextInt(5000));
+ Thread.sleep(random.nextInt(5000));
checkFullCompactionTestResult(numProducers);
}
private void testStandAloneFullCompactJobRandom(
TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
testRandom(
tEnv,
numProducers,
false,
"'bucket' = '4',"
+ + String.format(
+ "'write-buffer-size' = '%s',",
+ random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'full-compaction',"
- + "'changelog-producer.compaction-interval' = '1s',"
+ + "'changelog-producer.compaction-interval' = '2s',"
+ "'write.compaction-skip' = 'true'");
// sleep for a random amount of time to check
- // if stand-alone compactor job can find first snapshot to compact correctly
- Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+ // if dedicated compactor job can find first snapshot to compact correctly
+ Thread.sleep(random.nextInt(2500));
for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
- StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ createStreamExecutionEnvironment(random.nextInt(1900) + 100);
env.setParallelism(2);
FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
env.executeAsync();
@@ -298,13 +373,13 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records correctly
- Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+ Thread.sleep(random.nextInt(2500));
checkFullCompactionTestResult(numProducers);
}
private void checkFullCompactionTestResult(int numProducers) throws Exception {
- TableEnvironment sEnv = createStreamingTableEnvironment();
+ TableEnvironment sEnv = createStreamingTableEnvironment(100);
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -385,7 +460,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
+ (LIMIT + NUM_PARTS * NUM_KEYS)
+ "',"
+ " 'rows-per-second' = '"
- + (LIMIT / 5 + ThreadLocalRandom.current().nextInt(LIMIT / 10))
+ + (LIMIT / 20 + ThreadLocalRandom.current().nextInt(LIMIT / 20))
+ "'"
+ ")")
.await();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 332d04c7..803ea251 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -69,9 +70,9 @@ public class CompactActionITCase extends AbstractTestBase {
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
- DataTypes.INT().getLogicalType()
+ DataTypes.STRING().getLogicalType()
},
- new String[] {"dt", "hh", "k", "v"});
+ new String[] {"k", "v", "hh", "dt"});
private Path tablePath;
private String commitUser;
@@ -92,14 +93,14 @@ public class CompactActionITCase extends AbstractTestBase {
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
- write.write(rowData(20221208, 15, 1, 100));
- write.write(rowData(20221208, 16, 1, 100));
- write.write(rowData(20221209, 15, 1, 100));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
commit.commit(0, write.prepareCommit(true, 0));
- write.write(rowData(20221208, 15, 2, 200));
- write.write(rowData(20221208, 16, 2, 200));
- write.write(rowData(20221209, 15, 2, 200));
+ write.write(rowData(2, 200, 15, StringData.fromString("20221208")));
+ write.write(rowData(2, 200, 16, StringData.fromString("20221208")));
+ write.write(rowData(2, 200, 15, StringData.fromString("20221209")));
commit.commit(1, write.prepareCommit(true, 1));
Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -145,9 +146,9 @@ public class CompactActionITCase extends AbstractTestBase {
TableCommit commit = table.newCommit(commitUser);
// base records
- write.write(rowData(20221208, 15, 1, 100));
- write.write(rowData(20221208, 16, 1, 100));
- write.write(rowData(20221209, 15, 1, 100));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
commit.commit(0, write.prepareCommit(true, 0));
Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -180,12 +181,12 @@ public class CompactActionITCase extends AbstractTestBase {
Assert.assertEquals(2, (long) plan.snapshotId);
List<String> actual = getResult(table.newRead(), plan.splits());
actual.sort(String::compareTo);
- Assert.assertEquals(Arrays.asList("+I 20221208|15|1|100", "+I 20221209|15|1|100"), actual);
+ Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
// incremental records
- write.write(rowData(20221208, 15, 1, 101));
- write.write(rowData(20221208, 16, 1, 101));
- write.write(rowData(20221209, 15, 1, 101));
+ write.write(rowData(1, 101, 15, StringData.fromString("20221208")));
+ write.write(rowData(1, 101, 16, StringData.fromString("20221208")));
+ write.write(rowData(1, 101, 15, StringData.fromString("20221209")));
commit.commit(1, write.prepareCommit(true, 1));
write.close();
@@ -205,10 +206,10 @@ public class CompactActionITCase extends AbstractTestBase {
actual.sort(String::compareTo);
Assert.assertEquals(
Arrays.asList(
- "+U 20221208|15|1|101",
- "+U 20221209|15|1|101",
- "-U 20221208|15|1|100",
- "-U 20221209|15|1|100"),
+ "+U 1|101|15|20221208",
+ "+U 1|101|15|20221209",
+ "-U 1|100|15|20221208",
+ "-U 1|100|15|20221209"),
actual);
}
@@ -246,12 +247,12 @@ public class CompactActionITCase extends AbstractTestBase {
private String rowDataToString(RowData rowData) {
return String.format(
- "%s %d|%d|%d|%d",
+ "%s %d|%d|%d|%s",
rowData.getRowKind().shortString(),
rowData.getInt(0),
rowData.getInt(1),
rowData.getInt(2),
- rowData.getInt(3));
+ rowData.getString(3).toString());
}
private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
index 774074cb..8477cf5c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.connector.source.CompactorSourceBuilder;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -47,6 +49,9 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
/** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -58,9 +63,9 @@ public class CompactorSinkITCase extends AbstractTestBase {
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
- DataTypes.INT().getLogicalType()
+ DataTypes.STRING().getLogicalType()
},
- new String[] {"dt", "hh", "k", "v"});
+ new String[] {"k", "v", "hh", "dt"});
private Path tablePath;
private String commitUser;
@@ -78,14 +83,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
- write.write(rowData(20221208, 15, 1, 100));
- write.write(rowData(20221208, 16, 1, 100));
- write.write(rowData(20221209, 15, 1, 100));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+ write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
commit.commit(0, write.prepareCommit(true, 0));
- write.write(rowData(20221208, 15, 2, 200));
- write.write(rowData(20221208, 16, 2, 200));
- write.write(rowData(20221209, 15, 2, 200));
+ write.write(rowData(2, 200, 15, StringData.fromString("20221208")));
+ write.write(rowData(2, 200, 16, StringData.fromString("20221208")));
+ write.write(rowData(2, 200, 15, StringData.fromString("20221209")));
commit.commit(1, write.prepareCommit(true, 1));
Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -97,8 +102,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ CompactorSourceBuilder sourceBuilder =
+ new CompactorSourceBuilder(tablePath.toString(), table);
DataStreamSource<RowData> source =
- env.fromElements(rowData(20221208, 15, 0), rowData(20221209, 15, 0));
+ sourceBuilder
+ .withEnv(env)
+ .withContinuousMode(false)
+ .withPartitions(getSpecifiedPartitions())
+ .build();
new CompactorSinkBuilder(table).withInput(source).build();
env.execute();
@@ -119,6 +130,18 @@ public class CompactorSinkITCase extends AbstractTestBase {
}
}
+ private List<Map<String, String>> getSpecifiedPartitions() {
+ Map<String, String> partition1 = new HashMap<>();
+ partition1.put("dt", "20221208");
+ partition1.put("hh", "15");
+
+ Map<String, String> partition2 = new HashMap<>();
+ partition2.put("dt", "20221209");
+ partition2.put("hh", "15");
+
+ return Arrays.asList(partition1, partition2);
+ }
+
private GenericRowData rowData(Object... values) {
return GenericRowData.of(values);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
index f7ebba03..49d037ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
@@ -24,8 +24,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -42,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -60,10 +63,12 @@ public class CompactorSourceITCase extends AbstractTestBase {
new LogicalType[] {
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
- DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
DataTypes.INT().getLogicalType()
},
- new String[] {"dt", "hh", "k", "v"});
+ new String[] {"k", "v", "dt", "hh"});
+
+ private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
private Path tablePath;
private String commitUser;
@@ -80,12 +85,12 @@ public class CompactorSourceITCase extends AbstractTestBase {
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
- write.write(rowData(20221208, 15, 1, 1510));
- write.write(rowData(20221208, 16, 2, 1620));
+ write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
commit.commit(0, write.prepareCommit(true, 0));
- write.write(rowData(20221208, 15, 1, 1511));
- write.write(rowData(20221209, 15, 1, 1510));
+ write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+ write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
commit.commit(1, write.prepareCommit(true, 1));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -102,7 +107,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
}
assertThat(actual)
.hasSameElementsAs(
- Arrays.asList("+I 20221208|15|0", "+I 20221208|16|0", "+I 20221209|15|0"));
+ Arrays.asList(
+ "+I 2|20221208|15|0|0",
+ "+I 2|20221208|16|0|0",
+ "+I 2|20221209|15|0|0"));
write.close();
commit.close();
@@ -115,22 +123,22 @@ public class CompactorSourceITCase extends AbstractTestBase {
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
- write.write(rowData(20221208, 15, 1, 1510));
- write.write(rowData(20221208, 16, 2, 1620));
+ write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
commit.commit(0, write.prepareCommit(true, 0));
- write.write(rowData(20221208, 15, 1, 1511));
- write.write(rowData(20221209, 15, 1, 1510));
- write.compact(binaryRow(20221208, 15), 0, true);
- write.compact(binaryRow(20221209, 15), 0, true);
+ write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+ write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
+ write.compact(binaryRow("20221208", 15), 0, true);
+ write.compact(binaryRow("20221209", 15), 0, true);
commit.commit(1, write.prepareCommit(true, 1));
- write.write(rowData(20221208, 15, 2, 1520));
- write.write(rowData(20221208, 16, 2, 1621));
+ write.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1621, StringData.fromString("20221208"), 16));
commit.commit(2, write.prepareCommit(true, 2));
- write.write(rowData(20221208, 15, 1, 1512));
- write.write(rowData(20221209, 16, 2, 1620));
+ write.write(rowData(1, 1512, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1620, StringData.fromString("20221209"), 16));
commit.commit(3, write.prepareCommit(true, 3));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -148,35 +156,22 @@ public class CompactorSourceITCase extends AbstractTestBase {
assertThat(actual)
.hasSameElementsAs(
Arrays.asList(
- "+I 20221208|15|0",
- "+I 20221208|16|0",
- "+I 20221208|15|0",
- "+I 20221209|16|0"));
-
- write.write(rowData(20221209, 15, 2, 1520));
- write.write(rowData(20221208, 16, 1, 1510));
- write.write(rowData(20221209, 15, 1, 1511));
+ "+I 4|20221208|15|0|1",
+ "+I 4|20221208|16|0|1",
+ "+I 5|20221208|15|0|1",
+ "+I 5|20221209|16|0|1"));
+
+ write.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
+ write.write(rowData(1, 1510, StringData.fromString("20221208"), 16));
+ write.write(rowData(1, 1511, StringData.fromString("20221209"), 15));
commit.commit(4, write.prepareCommit(true, 4));
actual.clear();
for (int i = 0; i < 2; i++) {
actual.add(toString(it.next()));
}
- assertThat(actual).hasSameElementsAs(Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
-
- write.close();
- commit.close();
-
- write = table.newWrite(commitUser).withOverwrite(true);
- Map<String, String> partitionMap = new HashMap<>();
- partitionMap.put("dt", "20221209");
- partitionMap.put("hh", "16");
- commit = table.newCommit(commitUser).withOverwritePartition(partitionMap);
- write.write(rowData(20221209, 16, 1, 1512));
- write.write(rowData(20221209, 16, 2, 1622));
- commit.commit(5, write.prepareCommit(true, 5));
-
- assertThat(toString(it.next())).isEqualTo("+I 20221209|16|0");
+ assertThat(actual)
+ .hasSameElementsAs(Arrays.asList("+I 6|20221208|16|0|1", "+I 6|20221209|15|0|1"));
write.close();
commit.close();
@@ -189,10 +184,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
true,
getSpecifiedPartitions(),
Arrays.asList(
- "+I 20221208|16|0",
- "+I 20221209|15|0",
- "+I 20221208|16|0",
- "+I 20221209|15|0"));
+ "+I 1|20221208|16|0|1",
+ "+I 2|20221209|15|0|1",
+ "+I 3|20221208|16|0|1",
+ "+I 3|20221209|15|0|1"));
}
@Test
@@ -200,7 +195,7 @@ public class CompactorSourceITCase extends AbstractTestBase {
testPartitionSpec(
false,
getSpecifiedPartitions(),
- Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
+ Arrays.asList("+I 3|20221208|16|0|0", "+I 3|20221209|15|0|0"));
}
private List<Map<String, String>> getSpecifiedPartitions() {
@@ -224,17 +219,17 @@ public class CompactorSourceITCase extends AbstractTestBase {
TableWrite write = table.newWrite(commitUser);
TableCommit commit = table.newCommit(commitUser);
- write.write(rowData(20221208, 15, 1, 1510));
- write.write(rowData(20221208, 16, 2, 1620));
+ write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
commit.commit(0, write.prepareCommit(true, 0));
- write.write(rowData(20221208, 15, 2, 1520));
- write.write(rowData(20221209, 15, 2, 1520));
+ write.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
+ write.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
commit.commit(1, write.prepareCommit(true, 1));
- write.write(rowData(20221208, 15, 1, 1511));
- write.write(rowData(20221208, 16, 1, 1610));
- write.write(rowData(20221209, 15, 1, 1510));
+ write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+ write.write(rowData(1, 1610, StringData.fromString("20221208"), 16));
+ write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
commit.commit(2, write.prepareCommit(true, 2));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -258,22 +253,31 @@ public class CompactorSourceITCase extends AbstractTestBase {
}
private String toString(RowData rowData) {
+ int numFiles;
+ try {
+ numFiles = dataFileMetaSerializer.deserializeList(rowData.getBinary(4)).size();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
return String.format(
- "%s %d|%d|%d",
+ "%s %d|%s|%d|%d|%d",
rowData.getRowKind().shortString(),
- rowData.getInt(0),
- rowData.getInt(1),
- rowData.getInt(2));
+ rowData.getLong(0),
+ rowData.getString(1).toString(),
+ rowData.getInt(2),
+ rowData.getInt(3),
+ numFiles);
}
private GenericRowData rowData(Object... values) {
return GenericRowData.of(values);
}
- private BinaryRowData binaryRow(int dt, int hh) {
+ private BinaryRowData binaryRow(String dt, int hh) {
BinaryRowData b = new BinaryRowData(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
- writer.writeInt(0, dt);
+ writer.writeString(0, StringData.fromString(dt));
writer.writeInt(1, hh);
writer.complete();
return b;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 00ccf65f..8d7b1937 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -78,6 +78,7 @@ public class FileStoreSourceSplitGeneratorTest {
};
List<DataSplit> scanSplits =
AbstractDataTableScan.generateSplits(
+ 1L,
false,
Collections::singletonList,
plan.groupByPartFiles(plan.files(FileKind.ADD)));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 3ecc9983..f95c604a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -118,7 +118,7 @@ public class FileStoreSourceSplitSerializerTest {
boolean isIncremental,
long recordsToSkip) {
return new FileStoreSourceSplit(
- id, new DataSplit(partition, bucket, files, isIncremental), recordsToSkip);
+ id, new DataSplit(1L, partition, bucket, files, isIncremental), recordsToSkip);
}
private static FileStoreSourceSplit serializeAndDeserialize(FileStoreSourceSplit split)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 3df55343..42d7cc13 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -184,7 +184,8 @@ public class TestChangelogDataReadWrite {
null, // not used, we only create an empty writer
options,
EXTRACTOR)
- .createEmptyWriter(partition, bucket, service);
+ .createEmptyWriterContainer(partition, bucket, service)
+ .writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
diff --git a/flink-table-store-connector/src/test/resources/log4j2-test.properties b/flink-table-store-connector/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-connector/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-connector/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index dfa94931..264f2c66 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -98,6 +98,11 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
flushWriter(true, fullCompaction);
}
+ @Override
+ public void addNewFiles(List<DataFileMeta> files) {
+ files.forEach(compactManager::addNewFile);
+ }
+
@Override
public CommitIncrement prepareCommit(boolean blocking) throws Exception {
flushWriter(false, false);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index f17c8b0f..8432d109 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -143,6 +143,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
flushWriteBuffer(true, fullCompaction);
}
+ @Override
+ public void addNewFiles(List<DataFileMeta> files) {
+ files.forEach(compactManager::addNewFile);
+ }
+
@Override
public long memoryOccupancy() {
return writeBuffer.memoryOccupancy();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 58e5902a..ba363ef4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -41,6 +41,11 @@ public interface CompactStrategy {
/** Pick a compaction unit consisting of all existing files. */
static Optional<CompactUnit> pickFullCompaction(int numLevels, List<LevelSortedRun> runs) {
int maxLevel = numLevels - 1;
- return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ if (runs.size() == 1 && runs.get(0).level() == maxLevel) {
+ // only 1 sorted run on the max level, nothing to compact
+ return Optional.empty();
+ } else {
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 98212081..c498d56d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -54,12 +54,12 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
private final String commitUser;
- private final SnapshotManager snapshotManager;
+ protected final SnapshotManager snapshotManager;
private final FileStoreScan scan;
@Nullable protected IOManager ioManager;
- protected final Map<BinaryRowData, Map<Integer, WriterWithCommit<T>>> writers;
+ protected final Map<BinaryRowData, Map<Integer, WriterContainer<T>>> writers;
private final ExecutorService compactExecutor;
private boolean overwrite = false;
@@ -83,14 +83,13 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
return this;
}
- protected List<DataFileMeta> scanExistingFileMetas(BinaryRowData partition, int bucket) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ protected List<DataFileMeta> scanExistingFileMetas(
+ Long snapshotId, BinaryRowData partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
- if (latestSnapshotId != null) {
+ if (snapshotId != null) {
// Concat all the DataFileMeta of existing files into existingFileMetas.
- scan.withSnapshot(latestSnapshotId)
- .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
- .plan().files().stream()
+ scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
+ .withBucket(bucket).plan().files().stream()
.map(ManifestEntry::file)
.forEach(existingFileMetas::add);
}
@@ -103,14 +102,32 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
@Override
public void write(BinaryRowData partition, int bucket, T data) throws Exception {
- RecordWriter<T> writer = getWriter(partition, bucket);
+ RecordWriter<T> writer = getWriterWrapper(partition, bucket).writer;
writer.write(data);
}
@Override
public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
throws Exception {
- getWriter(partition, bucket).compact(fullCompaction);
+ getWriterWrapper(partition, bucket).writer.compact(fullCompaction);
+ }
+
+ @Override
+ public void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+ WriterContainer<T> writerContainer = getWriterWrapper(partition, bucket);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}",
+ partition,
+ bucket,
+ snapshotId,
+ writerContainer.baseSnapshotId,
+ files);
+ }
+ if (snapshotId > writerContainer.baseSnapshotId) {
+ writerContainer.writer.addNewFiles(files);
+ }
}
@Override
@@ -142,20 +159,20 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
List<FileCommittable> result = new ArrayList<>();
- Iterator<Map.Entry<BinaryRowData, Map<Integer, WriterWithCommit<T>>>> partIter =
+ Iterator<Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>>> partIter =
writers.entrySet().iterator();
while (partIter.hasNext()) {
- Map.Entry<BinaryRowData, Map<Integer, WriterWithCommit<T>>> partEntry = partIter.next();
+ Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
BinaryRowData partition = partEntry.getKey();
- Iterator<Map.Entry<Integer, WriterWithCommit<T>>> bucketIter =
+ Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter =
partEntry.getValue().entrySet().iterator();
while (bucketIter.hasNext()) {
- Map.Entry<Integer, WriterWithCommit<T>> entry = bucketIter.next();
+ Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
int bucket = entry.getKey();
- WriterWithCommit<T> writerWithCommit = entry.getValue();
+ WriterContainer<T> writerContainer = entry.getValue();
RecordWriter.CommitIncrement increment =
- writerWithCommit.writer.prepareCommit(blocking);
+ writerContainer.writer.prepareCommit(blocking);
FileCommittable committable =
new FileCommittable(
partition,
@@ -165,8 +182,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
result.add(committable);
if (committable.isEmpty()) {
- if (writerWithCommit.lastModifiedCommitIdentifier
- <= latestCommittedIdentifier) {
+ if (writerContainer.lastModifiedCommitIdentifier <= latestCommittedIdentifier) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
@@ -178,14 +194,14 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
+ "while latest committed identifier is {}",
partition,
bucket,
- writerWithCommit.lastModifiedCommitIdentifier,
+ writerContainer.lastModifiedCommitIdentifier,
latestCommittedIdentifier);
}
- writerWithCommit.writer.close();
+ writerContainer.writer.close();
bucketIter.remove();
}
} else {
- writerWithCommit.lastModifiedCommitIdentifier = commitIdentifier;
+ writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}
}
@@ -199,56 +215,64 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
@Override
public void close() throws Exception {
- for (Map<Integer, WriterWithCommit<T>> bucketWriters : writers.values()) {
- for (WriterWithCommit<T> writerWithCommit : bucketWriters.values()) {
- writerWithCommit.writer.close();
+ for (Map<Integer, WriterContainer<T>> bucketWriters : writers.values()) {
+ for (WriterContainer<T> writerContainer : bucketWriters.values()) {
+ writerContainer.writer.close();
}
}
writers.clear();
compactExecutor.shutdownNow();
}
- private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
- Map<Integer, WriterWithCommit<T>> buckets = writers.get(partition);
+ private WriterContainer<T> getWriterWrapper(BinaryRowData partition, int bucket) {
+ Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
writers.put(partition.copy(), buckets);
}
- return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket)).writer;
+ return buckets.computeIfAbsent(
+ bucket, k -> createWriterContainer(partition.copy(), bucket));
}
- private WriterWithCommit<T> createWriter(BinaryRowData partition, int bucket) {
+ private WriterContainer<T> createWriterContainer(BinaryRowData partition, int bucket) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}
- RecordWriter<T> writer =
+ WriterContainer<T> writerContainer =
overwrite
- ? createEmptyWriter(partition.copy(), bucket, compactExecutor)
- : createWriter(partition.copy(), bucket, compactExecutor);
- notifyNewWriter(writer);
- return new WriterWithCommit<>(writer);
+ ? createEmptyWriterContainer(partition.copy(), bucket, compactExecutor)
+ : createWriterContainer(partition.copy(), bucket, compactExecutor);
+ notifyNewWriter(writerContainer.writer);
+ return writerContainer;
}
protected void notifyNewWriter(RecordWriter<T> writer) {}
/** Create a {@link RecordWriter} from partition and bucket. */
@VisibleForTesting
- public abstract RecordWriter<T> createWriter(
+ public abstract WriterContainer<T> createWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor);
/** Create an empty {@link RecordWriter} from partition and bucket. */
@VisibleForTesting
- public abstract RecordWriter<T> createEmptyWriter(
+ public abstract WriterContainer<T> createEmptyWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor);
- /** {@link RecordWriter} with identifier of its last modified commit. */
- protected static class WriterWithCommit<T> {
+ /**
+ * {@link RecordWriter} with the snapshot id it is created upon and the identifier of its last
+ * modified commit.
+ */
+ @VisibleForTesting
+ public static class WriterContainer<T> {
- protected final RecordWriter<T> writer;
+ public final RecordWriter<T> writer;
+ private final long baseSnapshotId;
private long lastModifiedCommitIdentifier;
- private WriterWithCommit(RecordWriter<T> writer) {
+ protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
this.writer = writer;
+ this.baseSnapshotId =
+ baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : baseSnapshotId;
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index bb7921c4..261b7c3f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -81,16 +81,25 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
}
@Override
- public RecordWriter<RowData> createWriter(
+ public WriterContainer<RowData> createWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- return createWriter(
- partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ RecordWriter<RowData> writer =
+ createWriter(
+ partition,
+ bucket,
+ scanExistingFileMetas(latestSnapshotId, partition, bucket),
+ compactExecutor);
+ return new WriterContainer<>(writer, latestSnapshotId);
}
@Override
- public RecordWriter<RowData> createEmptyWriter(
+ public WriterContainer<RowData> createEmptyWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- return createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ RecordWriter<RowData> writer =
+ createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+ return new WriterContainer<>(writer, latestSnapshotId);
}
private RecordWriter<RowData> createWriter(
@@ -140,7 +149,13 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
new LongCounter(toCompact.get(0).minSequenceNumber()));
rewriter.write(
new RecordReaderIterator<>(
- read.createReader(new DataSplit(partition, bucket, toCompact, false))));
+ read.createReader(
+ new DataSplit(
+ 0L /* unused */,
+ partition,
+ bucket,
+ toCompact,
+ false))));
rewriter.close();
return rewriter.result();
};
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 919a1eaf..87105b2c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -65,6 +66,20 @@ public interface FileStoreWrite<T> {
*/
void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+ /**
+ * Notify that some new files are created at given snapshot in given bucket.
+ *
+ * <p>Most probably, these files are created by another job. Currently this method is only used
+ * by the dedicated compact job to see files created by writer jobs.
+ *
+ * @param snapshotId the snapshot id where new files are created
+ * @param partition the partition where new files are created
+ * @param bucket the bucket where new files are created
+ * @param files the new files themselves
+ */
+ void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
/**
* Prepare commit in the write.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index ec6f8453..2a01106b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -107,16 +107,25 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
}
@Override
- public RecordWriter<KeyValue> createWriter(
+ public WriterContainer<KeyValue> createWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- return createMergeTreeWriter(
- partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ RecordWriter<KeyValue> writer =
+ createMergeTreeWriter(
+ partition,
+ bucket,
+ scanExistingFileMetas(latestSnapshotId, partition, bucket),
+ compactExecutor);
+ return new WriterContainer<>(writer, latestSnapshotId);
}
@Override
- public RecordWriter<KeyValue> createEmptyWriter(
+ public WriterContainer<KeyValue> createEmptyWriterContainer(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ RecordWriter<KeyValue> writer =
+ createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+ return new WriterContainer<>(writer, latestSnapshotId);
}
private MergeTreeWriter createMergeTreeWriter(
@@ -130,7 +139,6 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
partition,
bucket,
restoreFiles);
- new RuntimeException().printStackTrace();
}
KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
index 77bd7d7b..65788b4f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
@@ -51,7 +51,7 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T>
}
private Iterator<MemoryOwner> memoryOwners() {
- Iterator<Map<Integer, WriterWithCommit<T>>> iterator = writers.values().iterator();
+ Iterator<Map<Integer, WriterContainer<T>>> iterator = writers.values().iterator();
return Iterators.concat(
new Iterator<Iterator<MemoryOwner>>() {
@Override
@@ -63,10 +63,10 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T>
public Iterator<MemoryOwner> next() {
return Iterators.transform(
iterator.next().values().iterator(),
- writerWithCommit ->
- writerWithCommit == null
+ writerContainer ->
+ writerContainer == null
? null
- : (MemoryOwner) (writerWithCommit.writer));
+ : (MemoryOwner) (writerContainer.writer));
}
});
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
index 19421d84..de9b0255 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
@@ -19,12 +19,16 @@
package org.apache.flink.table.store.file.utils;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -80,6 +84,13 @@ public abstract class ObjectSerializer<T> implements Serializable {
}
}
+ public final byte[] serializeList(List<T> records) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+ serializeList(records, view);
+ return baos.toByteArray();
+ }
+
/** De-serializes a record list from the given source input view. */
public final List<T> deserializeList(DataInputView source) throws IOException {
int size = source.readInt();
@@ -90,6 +101,12 @@ public abstract class ObjectSerializer<T> implements Serializable {
return records;
}
+ public final List<T> deserializeList(byte[] bytes) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(bais);
+ return deserializeList(view);
+ }
+
/** Convert a {@link T} to {@link RowData}. */
public abstract RowData toRow(T record);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index bedac510..3dc1594c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -19,8 +19,11 @@
package org.apache.flink.table.store.file.utils;
import org.apache.flink.table.store.file.io.CompactIncrement;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.NewFilesIncrement;
+import java.util.List;
+
/**
* The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
* write yet un-staged data. The incremental files ready to commit is returned to the system by the
@@ -41,6 +44,13 @@ public interface RecordWriter<T> {
*/
void compact(boolean fullCompaction) throws Exception;
+ /**
+ * Add files to the internal {@link org.apache.flink.table.store.file.compact.CompactManager}.
+ *
+ * @param files files to add
+ */
+ void addNewFiles(List<DataFileMeta> files);
+
/**
* Prepare for a commit.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index dce0eae4..13bfdd0c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -18,9 +18,11 @@
package org.apache.flink.table.store.table.sink;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import java.util.List;
@@ -41,6 +43,16 @@ public interface TableWrite extends AutoCloseable {
void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+ /**
+ * Notify that some new files are created at given snapshot in given bucket.
+ *
+ * <p>Most probably, these files are created by another job. Currently this method is only used
+ * by the dedicated compact job to see files created by writer jobs.
+ */
+ @Internal
+ void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
void close() throws Exception;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index fb3c9b20..e5557650 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.sink;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import java.util.List;
@@ -75,6 +76,12 @@ public class TableWriteImpl<T> implements TableWrite {
write.compact(partition, bucket, fullCompaction);
}
+ @Override
+ public void notifyNewFiles(
+ long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+ write.notifyNewFiles(snapshotId, partition, bucket, files);
+ }
+
@Override
public List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier)
throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index c545aeca..73f9fcc2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.source;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.operation.FileStoreScan;
@@ -115,18 +116,20 @@ public abstract class AbstractDataTableScan implements DataTableScan {
@Override
public DataFilePlan plan() {
FileStoreScan.Plan plan = scan.plan();
- return new DataFilePlan(
- plan.snapshotId(), generateSplits(plan.groupByPartFiles(plan.files(FileKind.ADD))));
- }
-
- private List<DataSplit> generateSplits(
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
- return generateSplits(
- scanKind != ScanKind.ALL, splitGenerator(pathFactory), groupedDataFiles);
+ Long snapshotId = plan.snapshotId();
+
+ List<DataSplit> splits =
+ generateSplits(
+ snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId,
+ scanKind != ScanKind.ALL,
+ splitGenerator(pathFactory),
+ plan.groupByPartFiles(plan.files(FileKind.ADD)));
+ return new DataFilePlan(snapshotId, splits);
}
@VisibleForTesting
public static List<DataSplit> generateSplits(
+ long snapshotId,
boolean isIncremental,
SplitGenerator splitGenerator,
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
@@ -139,10 +142,15 @@ public abstract class AbstractDataTableScan implements DataTableScan {
int bucket = bucketEntry.getKey();
if (isIncremental) {
// Don't split when incremental
- splits.add(new DataSplit(partition, bucket, bucketEntry.getValue(), true));
+ splits.add(
+ new DataSplit(
+ snapshotId, partition, bucket, bucketEntry.getValue(), true));
} else {
splitGenerator.split(bucketEntry.getValue()).stream()
- .map(files -> new DataSplit(partition, bucket, files, false))
+ .map(
+ files ->
+ new DataSplit(
+ snapshotId, partition, bucket, files, false))
.forEach(splits::add);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
index 8dfeac91..daba177c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
@@ -37,26 +37,40 @@ import java.util.Objects;
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ private long snapshotId;
private BinaryRowData partition;
private int bucket;
private List<DataFileMeta> files;
private boolean isIncremental;
public DataSplit(
- BinaryRowData partition, int bucket, List<DataFileMeta> files, boolean isIncremental) {
- init(partition, bucket, files, isIncremental);
+ long snapshotId,
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> files,
+ boolean isIncremental) {
+ init(snapshotId, partition, bucket, files, isIncremental);
}
private void init(
- BinaryRowData partition, int bucket, List<DataFileMeta> files, boolean isIncremental) {
+ long snapshotId,
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> files,
+ boolean isIncremental) {
+ this.snapshotId = snapshotId;
this.partition = partition;
this.bucket = bucket;
this.files = files;
this.isIncremental = isIncremental;
}
+ public long snapshotId() {
+ return snapshotId;
+ }
+
public BinaryRowData partition() {
return partition;
}
@@ -108,10 +122,11 @@ public class DataSplit implements Split {
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
DataSplit split = DataSplit.deserialize(new DataInputViewStreamWrapper(in));
- init(split.partition, split.bucket, split.files, split.isIncremental);
+ init(split.snapshotId, split.partition, split.bucket, split.files, split.isIncremental);
}
public void serialize(DataOutputView out) throws IOException {
+ out.writeLong(snapshotId);
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
out.writeInt(files.size());
@@ -123,6 +138,7 @@ public class DataSplit implements Split {
}
public static DataSplit deserialize(DataInputView in) throws IOException {
+ long snapshotId = in.readLong();
BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
int fileNumber = in.readInt();
@@ -131,6 +147,6 @@ public class DataSplit implements Split {
for (int i = 0; i < fileNumber; i++) {
files.add(dataFileSer.deserialize(in));
}
- return new DataSplit(partition, bucket, files, in.readBoolean());
+ return new DataSplit(snapshotId, partition, bucket, files, in.readBoolean());
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index 74e0f141..b69b5c82 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -33,13 +33,12 @@ public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
@Override
public boolean shouldScanSnapshot(Snapshot snapshot) {
- if (snapshot.commitKind() == Snapshot.CommitKind.APPEND
- || snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
return true;
}
LOG.debug(
- "Next snapshot id {} is neither APPEND nor OVERWRITE, but is {}, check next one.",
+ "Next snapshot id {} is not APPEND, but is {}, check next one.",
snapshot.id(),
snapshot.commitKind());
return false;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
index b94c3c32..8c222159 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.IteratorRecordReader;
import org.apache.flink.table.store.file.utils.RecordReader;
@@ -34,8 +36,10 @@ import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import java.io.IOException;
import java.util.ArrayList;
@@ -46,16 +50,18 @@ import java.util.Map;
/**
* A table to produce modified partitions and buckets for each snapshot.
*
- * <p>Only used internally by stand-alone compact job sources.
+ * <p>Only used internally by dedicated compact job sources.
*/
public class BucketsTable implements DataTable {
private static final long serialVersionUID = 1L;
private final FileStoreTable wrapped;
+ private final boolean isContinuous;
- public BucketsTable(FileStoreTable wrapped) {
+ public BucketsTable(FileStoreTable wrapped, boolean isContinuous) {
this.wrapped = wrapped;
+ this.isContinuous = isContinuous;
}
@Override
@@ -76,10 +82,17 @@ public class BucketsTable implements DataTable {
@Override
public RowType rowType() {
RowType partitionType = wrapped.schema().logicalPartitionType();
- return rowType(partitionType);
+
+ List<RowType.RowField> fields = new ArrayList<>();
+ fields.add(new RowType.RowField("_SNAPSHOT_ID", new BigIntType()));
+ fields.addAll(partitionType.getFields());
+ // same with ManifestEntry.schema
+ fields.add(new RowType.RowField("_BUCKET", new IntType()));
+ fields.add(new RowType.RowField("_FILES", new VarBinaryType()));
+ return new RowType(fields);
}
- public static RowType rowType(RowType partitionType) {
+ public static RowType partitionWithBucketRowType(RowType partitionType) {
List<RowType.RowField> fields = new ArrayList<>(partitionType.getFields());
// same with ManifestEntry.schema
fields.add(new RowType.RowField("_BUCKET", new IntType()));
@@ -103,13 +116,16 @@ public class BucketsTable implements DataTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new BucketsTable(wrapped.copy(dynamicOptions));
+ return new BucketsTable(wrapped.copy(dynamicOptions), isContinuous);
}
- private static class BucketsRead implements TableRead {
+ private class BucketsRead implements TableRead {
+
+ private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
@Override
public TableRead withFilter(Predicate predicate) {
+ // filter is done by scan
return this;
}
@@ -125,9 +141,25 @@ public class BucketsTable implements DataTable {
}
DataSplit dataSplit = (DataSplit) split;
+
RowData row =
- new JoinedRowData()
- .replace(dataSplit.partition(), GenericRowData.of(dataSplit.bucket()));
+ new JoinedRowData(
+ GenericRowData.of(dataSplit.snapshotId()), dataSplit.partition());
+ row = new JoinedRowData(row, GenericRowData.of(dataSplit.bucket()));
+
+ List<DataFileMeta> files = Collections.emptyList();
+ if (isContinuous) {
+ // Serialized files are only useful in streaming jobs.
+ // Batch compact jobs only run once, so they only need to know what buckets should
+ // be compacted and don't need to concern incremental new files.
+ files = dataSplit.files();
+ }
+ row =
+ new JoinedRowData(
+ row,
+ GenericRowData.of(
+ (Object) dataFileMetaSerializer.serializeList(files)));
+
return new IteratorRecordReader<>(Collections.singletonList(row).iterator());
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index d8d46e03..376befa8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -193,10 +193,12 @@ public class TestFileStore extends KeyValueFileStore {
ExecutorService service = Executors.newSingleThreadExecutor();
RecordWriter<KeyValue> writer =
emptyWriter
- ? write.createEmptyWriter(
- partition, bucket, service)
- : write.createWriter(
- partition, bucket, service);
+ ? write.createEmptyWriterContainer(
+ partition, bucket, service)
+ .writer
+ : write.createWriterContainer(
+ partition, bucket, service)
+ .writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(
@@ -312,6 +314,7 @@ public class TestFileStore extends KeyValueFileStore {
new RecordReaderIterator<>(
read.createReader(
new DataSplit(
+ 0L /* unused */,
entryWithPartition.getKey(),
entryWithBucket.getKey(),
entryWithBucket.getValue(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a191c20d..7f4ecf93 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -223,9 +223,9 @@ public class KeyValueFileStoreReadTest {
throws Exception {
store.commitData(data, partitionCalculator, kv -> 0);
FileStoreScan scan = store.newScan();
+ Long snapshotId = store.snapshotManager().latestSnapshotId();
Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
- scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
- .stream()
+ scan.withSnapshot(snapshotId).plan().files().stream()
.collect(Collectors.groupingBy(ManifestEntry::partition));
KeyValueFileStoreRead read = store.newRead();
if (keyProjection != null) {
@@ -241,6 +241,7 @@ public class KeyValueFileStoreReadTest {
RecordReader<KeyValue> reader =
read.createReader(
new DataSplit(
+ snapshotId,
entry.getKey(),
0,
entry.getValue().stream()
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index a3018088..03b9bc88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -279,8 +279,10 @@ public class TestCommitThread extends Thread {
});
MergeTreeWriter writer =
empty
- ? (MergeTreeWriter) write.createEmptyWriter(partition, 0, service)
- : (MergeTreeWriter) write.createWriter(partition, 0, service);
+ ? (MergeTreeWriter)
+ write.createEmptyWriterContainer(partition, 0, service).writer
+ : (MergeTreeWriter)
+ write.createWriterContainer(partition, 0, service).writer;
writer.setMemoryPool(
new HeapMemorySegmentPool(
WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index d2cbb40f..5c1b815c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -44,7 +44,13 @@ public class SplitTest {
for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
files.add(gen.next().meta);
}
- DataSplit split = new DataSplit(data.partition, data.bucket, files, false);
+ DataSplit split =
+ new DataSplit(
+ ThreadLocalRandom.current().nextLong(100),
+ data.partition,
+ data.bucket,
+ files,
+ false);
ByteArrayOutputStream out = new ByteArrayOutputStream();
split.serialize(new DataOutputViewStreamWrapper(out));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 66a21e3b..c23975c6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.source.snapshot;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -31,6 +32,8 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -41,6 +44,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ContinuousCompactorFollowUpScanner}. */
public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+ private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
+
@Test
public void testGetPlan() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -74,7 +79,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
- BucketsTable bucketsTable = new BucketsTable(table);
+ BucketsTable bucketsTable = new BucketsTable(table, true);
DataTableScan scan = bucketsTable.newScan();
TableRead read = bucketsTable.newRead();
ContinuousCompactorFollowUpScanner scanner = new ContinuousCompactorFollowUpScanner();
@@ -85,7 +90,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
assertThat(plan.snapshotId).isEqualTo(1);
assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|0", "+I 2|0"));
+ .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
snapshot = snapshotManager.snapshot(2);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
@@ -93,7 +98,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
plan = scanner.getPlan(2, scan);
assertThat(plan.snapshotId).isEqualTo(2);
assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Collections.singletonList("+I 2|0"));
+ .hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
snapshot = snapshotManager.snapshot(3);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
@@ -101,17 +106,24 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
snapshot = snapshotManager.snapshot(4);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.getPlan(4, scan);
- assertThat(plan.snapshotId).isEqualTo(4);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Collections.singletonList("+I 1|0"));
+ assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
}
@Override
protected String rowDataToString(RowData rowData) {
+ int numFiles;
+ try {
+ numFiles = dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
return String.format(
- "%s %d|%d",
- rowData.getRowKind().shortString(), rowData.getInt(0), rowData.getInt(1));
+ "%s %d|%d|%d|%d",
+ rowData.getRowKind().shortString(),
+ rowData.getLong(0),
+ rowData.getInt(1),
+ rowData.getInt(2),
+ numFiles);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index ef89f24e..6d8d87a3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -62,7 +62,7 @@ public class ContinuousCompactorStartingScannerTest extends SnapshotEnumeratorTe
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
- BucketsTable bucketsTable = new BucketsTable(table);
+ BucketsTable bucketsTable = new BucketsTable(table, true);
ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, bucketsTable.newScan());
assertThat(plan.snapshotId).isEqualTo(3);
@@ -76,7 +76,7 @@ public class ContinuousCompactorStartingScannerTest extends SnapshotEnumeratorTe
public void testNoSnapshot() throws Exception {
FileStoreTable table = createFileStoreTable();
SnapshotManager snapshotManager = table.snapshotManager();
- BucketsTable bucketsTable = new BucketsTable(table);
+ BucketsTable bucketsTable = new BucketsTable(table, true);
ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
assertThat(scanner.getPlan(snapshotManager, bucketsTable.newScan())).isNull();
}
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.properties b/flink-table-store-core/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-core/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-core/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index ada923b2..3d682efa 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -89,7 +89,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
tableDdl,
testDataSourceDdl);
- // run stand-alone compact job
+ // run dedicated compact job
Container.ExecResult execResult =
jobManager.execInContainer(
"bin/flink",
diff --git a/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties b/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index ef3c3042..7f12b093 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -55,6 +55,7 @@ public class TableStoreInputSplitTest {
new TableStoreInputSplit(
tempDir.toString(),
new DataSplit(
+ ThreadLocalRandom.current().nextLong(100),
wantedPartition,
0,
generated.stream()
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-kafka/src/test/resources/log4j2-test.properties b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-kafka/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
logger.kafka.name = kafka
logger.kafka.level = OFF
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties b/flink-table-store-spark/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-spark/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-spark/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
logger.kafka.name = kafka
logger.kafka.level = OFF
diff --git a/flink-table-store-spark2/src/test/resources/log4j2-test.properties b/flink-table-store-spark2/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-spark2/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-spark2/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
logger.kafka.name = kafka
logger.kafka.level = OFF