You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/10 10:10:08 UTC
[flink-table-store] branch master updated: [FLINK-27707] Implement ManagedTableFactory#onCompactTable
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new da742766 [FLINK-27707] Implement ManagedTableFactory#onCompactTable
da742766 is described below
commit da7427662b24f6f0c150b2a19aa7b12c6597a0bd
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Jun 10 18:10:03 2022 +0800
[FLINK-27707] Implement ManagedTableFactory#onCompactTable
This closes #138
---
.../connector/StatefulPrecommittingSinkWriter.java | 28 +-
.../flink/table/store/connector/TableStore.java | 25 +-
.../store/connector/TableStoreFactoryOptions.java | 18 +-
.../store/connector/TableStoreManagedFactory.java | 10 +-
.../table/store/connector/sink/StoreSink.java | 26 +-
.../store/connector/sink/StoreSinkCompactor.java | 149 ++++++++++
.../store/connector/sink/StoreSinkWriter.java | 4 +-
.../table/store/connector/sink/TableStoreSink.java | 3 +-
.../connector/source/FileStoreEmptySource.java | 99 +++++++
.../store/connector/source/FileStoreSource.java | 26 +-
.../source/StaticFileStoreSplitEnumerator.java | 2 +-
.../store/connector/AlterTableCompactITCase.java | 292 ++++++++++++++++++++
.../store/connector/FileStoreTableITCase.java | 3 +-
.../connector/TableStoreManagedFactoryTest.java | 179 ++++++------
.../table/store/connector/sink/StoreSinkTest.java | 88 +++++-
.../table/store/connector/sink/TestFileStore.java | 10 +
.../connector/source/FileStoreSourceTest.java | 299 ---------------------
.../flink/table/store/file/FileStoreImpl.java | 21 +-
.../table/store/file/mergetree/Increment.java | 6 +
.../file/mergetree/compact/CompactManager.java | 43 +--
.../store/file/mergetree/compact/CompactUnit.java | 2 +-
.../store/file/operation/FileStoreCommitImpl.java | 14 +-
.../table/store/file/operation/FileStoreWrite.java | 9 +
.../store/file/operation/FileStoreWriteImpl.java | 41 ++-
.../store/file/predicate/PredicateConverter.java | 1 +
.../store/file/utils/FileStorePathFactory.java | 23 +-
.../store/file/utils/KeyComparatorSupplier.java | 49 ++++
.../store/file/utils/PartitionedManifestMeta.java | 131 ---------
.../table/store/file/writer/CompactWriter.java | 82 ++++++
.../table/store/file/KeyValueSerializerTest.java | 4 +-
.../table/store/file/TestKeyValueGenerator.java | 154 +++++++++--
.../flink/table/store/file/data/DataFileTest.java | 10 +-
.../store/file/data/DataFileTestDataGenerator.java | 2 +-
.../store/file/manifest/ManifestFileTest.java | 7 +-
.../store/file/manifest/ManifestListTest.java | 4 +-
.../file/manifest/ManifestTestDataGenerator.java | 2 +-
.../store/file/operation/FileStoreCommitTest.java | 27 +-
.../store/file/operation/FileStoreExpireTest.java | 4 +-
.../store/file/operation/FileStoreReadTest.java | 4 +-
.../store/file/operation/FileStoreScanTest.java | 4 +-
.../store/file/operation/TestCommitThread.java | 4 +-
41 files changed, 1232 insertions(+), 677 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
similarity index 54%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
copy to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
index 253bf8f6..f4266d82 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
@@ -16,20 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.connector;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.sink.Committable;
-import java.util.concurrent.ExecutorService;
-
-/** Write operation which provides {@link RecordWriter} creation. */
-public interface FileStoreWrite {
-
- /** Create a {@link RecordWriter} from partition and bucket. */
- RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
-
- /** Create an empty {@link RecordWriter} from partition and bucket. */
- RecordWriter createEmptyWriter(
- BinaryRowData partition, int bucket, ExecutorService compactExecutor);
-}
+/**
+ * The base interface for file store sink writers.
+ *
+ * @param <WriterStateT> The type of the writer's state.
+ */
+public interface StatefulPrecommittingSinkWriter<WriterStateT>
+ extends StatefulSink.StatefulSinkWriter<RowData, WriterStateT>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<RowData, Committable> {}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 3e9b1f7c..224949b8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreEmptySource;
import org.apache.flink.table.store.connector.source.FileStoreSource;
import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
import org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
@@ -52,6 +53,7 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogSourceProvider;
@@ -66,6 +68,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.flink.table.store.file.FileStoreOptions.MERGE_ENGINE;
@@ -118,6 +122,20 @@ public class TableStore {
return schema.partitionKeys().size() > 0;
}
+ public boolean isCompactionTask() {
+ return options.get(COMPACTION_MANUAL_TRIGGERED);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ public Map<String, String> getCompactPartSpec() {
+ String json = options.get(COMPACTION_PARTITION_SPEC);
+ if (json == null) {
+ return null;
+ }
+ return JsonSerdeUtil.fromJson(json, Map.class);
+ }
+
public boolean valueCountMode() {
return trimmedPrimaryKeys().size() == 0;
}
@@ -308,8 +326,7 @@ public class TableStore {
continuousScanLatest,
projectedFields,
partitionPredicate,
- fieldPredicate,
- null);
+ fieldPredicate);
}
private Source<RowData, ?, ?> buildSource() {
@@ -352,7 +369,7 @@ public class TableStore {
.orElse(type);
DataStreamSource<RowData> dataStream =
env.fromSource(
- buildSource(),
+ isCompactionTask() ? new FileStoreEmptySource() : buildSource(),
WatermarkStrategy.noWatermarks(),
tableIdentifier.asSummaryString(),
InternalTypeInfo.of(produceType));
@@ -428,6 +445,8 @@ public class TableStore {
trimmedPrimaryKeysIndex(),
fullPrimaryKeysIndex(),
numBucket,
+ isCompactionTask(),
+ getCompactPartSpec(),
lockFactory,
overwritePartition,
logSinkProvider);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index b0aa9302..18d04a68 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -47,15 +47,20 @@ public class TableStoreFactoryOptions {
+ "of a partition/table.");
@Internal
- public static final ConfigOption<String> COMPACTION_SCANNED_MANIFEST =
- ConfigOptions.key("compaction.scanned-manifest")
+ public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
+ ConfigOptions.key("compaction.manual-triggered")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "An internal flag to indicate a manual triggered compaction job.");
+
+ @Internal
+ public static final ConfigOption<String> COMPACTION_PARTITION_SPEC =
+ ConfigOptions.key("compaction.partition-spec")
.stringType()
.noDefaultValue()
.withDescription(
- "The serialized json string of manifest entries which are scanned during manual compaction "
- + "planning phase and injected back into enriched options. The json format contains "
- + "snapshot id and each partition's data file meta list (among which "
- + "each data file meta is encoded by Base64 format) tagged with bucket id.");
+ "An internal json string to record the user-specified partition spec for the manual triggered compaction.");
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
@@ -77,7 +82,6 @@ public class TableStoreFactoryOptions {
public static Set<ConfigOption<?>> allOptions() {
Set<ConfigOption<?>> allOptions = new HashSet<>();
allOptions.add(COMPACTION_RESCALE_BUCKET);
- allOptions.add(COMPACTION_SCANNED_MANIFEST);
allOptions.add(LOG_SYSTEM);
allOptions.add(SINK_PARALLELISM);
allOptions.add(SCAN_PARALLELISM);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index e858ca5e..bdd6bb3d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.util.Preconditions;
@@ -39,6 +40,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -183,6 +186,11 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+ newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+ newOptions.put(
+ COMPACTION_PARTITION_SPEC.key(),
+ JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));
+ return newOptions;
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 5a3d498c..b02823aa 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.WriteMode;
@@ -44,6 +45,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
@@ -69,6 +71,10 @@ public class StoreSink<WriterStateT, LogCommT>
private final int numBucket;
+ private final boolean compactionTask;
+
+ @Nullable private final Map<String, String> compactPartitionSpec;
+
@Nullable private final CatalogLock.Factory lockFactory;
@Nullable private final Map<String, String> overwritePartition;
@@ -83,6 +89,8 @@ public class StoreSink<WriterStateT, LogCommT>
int[] primaryKeys,
int[] logPrimaryKeys,
int numBucket,
+ boolean compactionTask,
+ @Nullable Map<String, String> compactPartitionSpec,
@Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition,
@Nullable LogSinkProvider logSinkProvider) {
@@ -93,19 +101,33 @@ public class StoreSink<WriterStateT, LogCommT>
this.primaryKeys = primaryKeys;
this.logPrimaryKeys = logPrimaryKeys;
this.numBucket = numBucket;
+ this.compactionTask = compactionTask;
+ this.compactPartitionSpec = compactPartitionSpec;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
this.logSinkProvider = logSinkProvider;
}
@Override
- public StoreSinkWriter<WriterStateT> createWriter(InitContext initContext) throws IOException {
+ public StatefulPrecommittingSinkWriter<WriterStateT> createWriter(InitContext initContext)
+ throws IOException {
return restoreWriter(initContext, null);
}
+ @SuppressWarnings("unchecked")
@Override
- public StoreSinkWriter<WriterStateT> restoreWriter(
+ public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
InitContext initContext, Collection<WriterStateT> states) throws IOException {
+ if (compactionTask) {
+ return (StatefulPrecommittingSinkWriter<WriterStateT>)
+ new StoreSinkCompactor(
+ initContext.getSubtaskId(),
+ initContext.getNumberOfParallelSubtasks(),
+ fileStore,
+ compactPartitionSpec == null
+ ? Collections.emptyMap()
+ : compactPartitionSpec);
+ }
SinkWriter<SinkRecord> logWriter = null;
LogWriteCallback logCallback = null;
if (logSinkProvider != null) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
new file mode 100644
index 00000000..d5fcf374
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+ private final int subTaskId;
+ private final int numOfParallelInstances;
+
+ private final FileStore fileStore;
+ private final Map<String, String> partitionSpec;
+ private final ExecutorService compactExecutor;
+
+ public StoreSinkCompactor(
+ int subTaskId,
+ int numOfParallelInstances,
+ FileStore fileStore,
+ Map<String, String> partitionSpec) {
+ this.subTaskId = subTaskId;
+ this.numOfParallelInstances = numOfParallelInstances;
+ this.fileStore = fileStore;
+ this.partitionSpec = partitionSpec;
+ this.compactExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ String.format("compaction-subtask-%d", subTaskId)));
+ }
+
+ @Override
+ public void flush(boolean endOfInput) {}
+
+ @Override
+ public void write(RowData element, Context context) throws IOException, InterruptedException {
+ // nothing to write
+ }
+
+ @Override
+ public void close() throws Exception {
+ compactExecutor.shutdownNow();
+ }
+
+ @Override
+ public List<Void> snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<Committable> prepareCommit() throws IOException {
+ List<Committable> committables = new ArrayList<>();
+
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+ partitionSpec, fileStore.partitionType()))
+ .plan();
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+ plan.groupByPartFiles().entrySet()) {
+ BinaryRowData partition = partEntry.getKey();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+ partEntry.getValue().entrySet()) {
+ int bucket = bucketEntry.getKey();
+ List<DataFileMeta> restoredFiles = bucketEntry.getValue();
+ if (select(partition, bucket)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Assign partition {}, bucket {} to subtask {}",
+ FileStorePathFactory.getPartitionComputer(
+ fileStore.partitionType(),
+ FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
+ .defaultValue())
+ .generatePartValues(partition),
+ bucket,
+ subTaskId);
+ }
+ RecordWriter writer =
+ fileStore
+ .newWrite()
+ .createCompactWriter(
+ partition.copy(),
+ bucket,
+ compactExecutor,
+ restoredFiles);
+ FileCommittable committable;
+ try {
+ committable =
+ new FileCommittable(
+ partition, bucketEntry.getKey(), writer.prepareCommit());
+ committables.add(new Committable(Committable.Kind.FILE, committable));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+ return committables;
+ }
+
+ private boolean select(BinaryRowData partition, int bucket) {
+ return subTaskId == Math.abs(Objects.hash(partition, bucket) % numOfParallelInstances);
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 9a914636..62e2025e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
@@ -54,8 +55,7 @@ import java.util.concurrent.Executors;
/** A {@link SinkWriter} for dynamic store. */
public class StoreSinkWriter<WriterStateT>
- implements StatefulSinkWriter<RowData, WriterStateT>,
- PrecommittingSinkWriter<RowData, Committable> {
+ implements StatefulPrecommittingSinkWriter<WriterStateT> {
private static final BinaryRowData DUMMY_KEY = BinaryRowDataUtil.EMPTY_ROW;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 962aa0f4..0704c3ec 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -127,7 +127,8 @@ public class TableStoreSink
});
}
// Do not sink to log store when overwrite mode
- final LogSinkProvider finalLogSinkProvider = overwrite ? null : logSinkProvider;
+ final LogSinkProvider finalLogSinkProvider =
+ overwrite || tableStore.isCompactionTask() ? null : logSinkProvider;
return (DataStreamSinkProvider)
(providerContext, dataStream) ->
tableStore
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
new file mode 100644
index 00000000..bb671f23
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** A dedicated empty source for "ALTER TABLE ... COMPACT". */
+public class FileStoreEmptySource
+ implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> {
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<RowData, FileStoreSourceSplit> createReader(
+ SourceReaderContext readerContext) {
+ return new SourceReader<RowData, FileStoreSourceSplit>() {
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<RowData> output) {
+ return InputStatus.END_OF_INPUT;
+ }
+
+ @Override
+ public List<FileStoreSourceSplit> snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List<FileStoreSourceSplit> splits) {}
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+ SplitEnumeratorContext<FileStoreSourceSplit> context) {
+ return restoreEnumerator(context, null);
+ }
+
+ @Override
+ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+ SplitEnumeratorContext<FileStoreSourceSplit> context,
+ PendingSplitsCheckpoint checkpoint) {
+ return new StaticFileStoreSplitEnumerator(context, null, Collections.emptyList());
+ }
+
+ @Override
+ public FileStoreSourceSplitSerializer getSplitSerializer() {
+ return new FileStoreSourceSplitSerializer();
+ }
+
+ @Override
+ public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
+ return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 61f70997..fccd3c7e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector.source;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
@@ -32,7 +31,6 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
import javax.annotation.Nullable;
@@ -66,12 +64,6 @@ public class FileStoreSource
@Nullable private final Predicate fieldPredicate;
- /**
- * The partitioned manifest meta collected at planning phase when manual compaction is
- * triggered.
- */
- @Nullable private final PartitionedManifestMeta specifiedPartManifests;
-
public FileStoreSource(
FileStore fileStore,
WriteMode writeMode,
@@ -81,8 +73,7 @@ public class FileStoreSource
boolean latestContinuous,
@Nullable int[][] projectedFields,
@Nullable Predicate partitionPredicate,
- @Nullable Predicate fieldPredicate,
- @Nullable PartitionedManifestMeta specifiedPartManifests) {
+ @Nullable Predicate fieldPredicate) {
this.fileStore = fileStore;
this.writeMode = writeMode;
this.valueCountMode = valueCountMode;
@@ -92,7 +83,6 @@ public class FileStoreSource
this.projectedFields = projectedFields;
this.partitionPredicate = partitionPredicate;
this.fieldPredicate = fieldPredicate;
- this.specifiedPartManifests = specifiedPartManifests;
}
@Override
@@ -138,14 +128,6 @@ public class FileStoreSource
PendingSplitsCheckpoint checkpoint) {
FileStoreScan scan = fileStore.newScan();
- if (specifiedPartManifests != null) {
- return new StaticFileStoreSplitEnumerator(
- context,
- scan.snapshot(specifiedPartManifests.getSnapshotId()),
- new FileStoreSourceSplitGenerator()
- .createSplits(specifiedPartManifests.getManifestEntries()));
- }
-
if (partitionPredicate != null) {
scan.withPartitionFilter(partitionPredicate);
}
@@ -205,10 +187,4 @@ public class FileStoreSource
public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
return new PendingSplitsCheckpointSerializer(getSplitSerializer());
}
-
- @VisibleForTesting
- @Nullable
- PartitionedManifestMeta getSpecifiedPartManifests() {
- return specifiedPartManifests;
- }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index c289a606..599417f1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -44,7 +44,7 @@ public class StaticFileStoreSplitEnumerator
public StaticFileStoreSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
- Snapshot snapshot,
+ @Nullable Snapshot snapshot,
Collection<FileStoreSourceSplit> splits) {
this.context = context;
this.snapshot = snapshot;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
new file mode 100644
index 00000000..d664f5ee
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for 'ALTER TABLE ... COMPACT'. */
+public class AlterTableCompactITCase extends FileStoreTableITCase {
+
+ private TestKeyValueGenerator generator;
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T0 (\n"
+ + "shopId INT\n, "
+ + "orderId BIGINT NOT NULL\n, "
+ + "itemId BIGINT)",
+ "CREATE TABLE IF NOT EXISTS T1 (\n"
+ + "dt STRING\n, "
+ + "shopId INT\n, "
+ + "orderId BIGINT NOT NULL\n, "
+ + "itemId BIGINT)\n "
+ + "PARTITIONED BY (dt)",
+ "CREATE TABLE IF NOT EXISTS T2 (\n"
+ + "dt STRING\n, "
+ + "hr INT\n, "
+ + "shopId INT\n, "
+ + "orderId BIGINT NOT NULL\n, "
+ + "itemId BIGINT)"
+ + "PARTITIONED BY (dt, hr)");
+ }
+
+ @Test
+ public void testNonPartitioned() throws IOException {
+ generator = new TestKeyValueGenerator(NON_PARTITIONED);
+ Random random = new Random();
+ innerTest("T0", random.nextInt(10) + 1, NON_PARTITIONED);
+ }
+
+ @Test
+ public void testSinglePartitioned() throws IOException {
+ generator = new TestKeyValueGenerator(SINGLE_PARTITIONED);
+ Random random = new Random();
+ innerTest("T1", random.nextInt(10) + 1, SINGLE_PARTITIONED);
+ }
+
+ @Test
+ public void testMultiPartitioned() throws IOException {
+ generator = new TestKeyValueGenerator(MULTI_PARTITIONED);
+ Random random = new Random();
+ innerTest("T2", random.nextInt(10) + 1, MULTI_PARTITIONED);
+ }
+
+ private void innerTest(String tableName, int batchNum, TestKeyValueGenerator.GeneratorMode mode)
+ throws IOException {
+ // increase trigger to avoid auto-compaction
+ batchSql(
+ String.format(
+ "ALTER TABLE %s SET ('num-sorted-run.compaction-trigger' = '50')",
+ tableName));
+ batchSql(
+ String.format(
+ "ALTER TABLE %s SET ('num-sorted-run.stop-trigger' = '50')", tableName));
+
+ Random random = new Random();
+ List<KeyValue> dataset = new ArrayList<>();
+ long latestSnapshot = 0L;
+ for (int i = 0; i < batchNum; i++) {
+ List<KeyValue> data = generateData(random.nextInt(200) + 1);
+ String insertQuery =
+ String.format(
+ "INSERT INTO %s VALUES \n%s",
+ tableName,
+ data.stream()
+ .map(kv -> kvAsString(kv, mode))
+ .collect(Collectors.joining(",\n")));
+ batchSql(insertQuery);
+ Snapshot snapshot = findLatestSnapshot(tableName);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ latestSnapshot = snapshot.id();
+ dataset.addAll(data);
+ }
+ if (mode == NON_PARTITIONED) {
+ String compactQuery = String.format("ALTER TABLE %s COMPACT", tableName);
+ String selectQuery = String.format("SELECT * FROM %s", tableName);
+ compactAndCheck(
+ tableName,
+ compactQuery,
+ selectQuery,
+ latestSnapshot,
+ dataset.stream()
+ .map(kv -> convertToRow(kv, mode))
+ .collect(Collectors.toList()));
+ } else {
+ List<BinaryRowData> partitions =
+ dataset.stream()
+ .map(kv -> generator.getPartition(kv))
+ .distinct()
+ .collect(Collectors.toList());
+ while (!partitions.isEmpty()) {
+ BinaryRowData part = pickPartition(partitions);
+ Map<String, String> partSpec = TestKeyValueGenerator.toPartitionMap(part, mode);
+ String compactQuery =
+ String.format(
+ "ALTER TABLE %s PARTITION (%s) COMPACT",
+ tableName, partAsString(partSpec, false));
+ String selectQuery =
+ String.format(
+ "SELECT * FROM %s WHERE %s",
+ tableName, partAsString(partSpec, true));
+ compactAndCheck(
+ tableName,
+ compactQuery,
+ selectQuery,
+ latestSnapshot,
+ dataset.stream()
+ .filter(kv -> partFilter(kv, part, mode))
+ .map(kv -> convertToRow(kv, mode))
+ .collect(Collectors.toList()));
+ latestSnapshot = findLatestSnapshot(tableName).id();
+ }
+ }
+ }
+
+ private void compactAndCheck(
+ String tableName,
+ String compactQuery,
+ String selectQuery,
+ long latestSnapshot,
+ List<Row> expectedData)
+ throws IOException {
+ batchSql(compactQuery);
+ Snapshot snapshot = findLatestSnapshot(tableName);
+ assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ // check idempotence
+ batchSql(compactQuery);
+ assertThat(findLatestSnapshot(tableName).id()).isEqualTo(snapshot.id());
+
+ // read data
+ List<Row> readData = batchSql(selectQuery);
+ assertThat(readData).containsExactlyInAnyOrderElementsOf(expectedData);
+ }
+
+ private boolean partFilter(
+ KeyValue kv, BinaryRowData partition, TestKeyValueGenerator.GeneratorMode mode) {
+ RowData record = kv.value();
+ if (mode == SINGLE_PARTITIONED) {
+ return record.getString(0).equals(partition.getString(0));
+ } else if (mode == MULTI_PARTITIONED) {
+ return record.getString(0).equals(partition.getString(0))
+ && record.getInt(1) == partition.getInt(1);
+ }
+ return true;
+ }
+
+ private String partAsString(Map<String, String> partSpec, boolean predicate) {
+ String dt = String.format("dt = '%s'", partSpec.get("dt"));
+ String hr = partSpec.get("hr");
+ if (hr == null) {
+ return dt;
+ }
+ hr = String.format("hr = %s", hr);
+ return predicate ? String.join(" AND ", dt, hr) : String.join(", ", dt, hr);
+ }
+
+ private BinaryRowData pickPartition(List<BinaryRowData> partitions) {
+ Random random = new Random();
+ int idx = random.nextInt(partitions.size());
+ return partitions.remove(idx);
+ }
+
+ private List<KeyValue> generateData(int numRecords) {
+ List<KeyValue> data = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ KeyValue kv = generator.next();
+ if (kv.valueKind() == ValueKind.ADD) {
+ data.add(kv);
+ }
+ }
+ return data;
+ }
+
+ private Row convertToRow(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
+ byte kind = keyValue.valueKind().toByteValue();
+ RowData record = keyValue.value();
+ String rowKind = RowKind.fromByteValue(kind == 0 ? kind : 3).shortString();
+ if (mode == NON_PARTITIONED) {
+ return changelogRow(rowKind, record.getInt(0), record.getLong(1), record.getLong(2));
+ } else if (mode == SINGLE_PARTITIONED) {
+ return changelogRow(
+ rowKind,
+ record.getString(0).toString(),
+ record.getInt(1),
+ record.getLong(2),
+ record.getLong(3));
+ }
+ return changelogRow(
+ rowKind,
+ record.getString(0).toString(),
+ record.getInt(1),
+ record.getInt(2),
+ record.getLong(3),
+ record.getLong(4));
+ }
+
+ private String kvAsString(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
+ RowData record = keyValue.value();
+ switch (mode) {
+ case NON_PARTITIONED:
+ return String.format(
+ "(%d, %d, %d)", record.getInt(0), record.getLong(1), record.getLong(2));
+ case SINGLE_PARTITIONED:
+ return String.format(
+ "('%s', %d, %d, %d)",
+ record.getString(0),
+ record.getInt(1),
+ record.getLong(2),
+ record.getLong(3));
+ case MULTI_PARTITIONED:
+ return String.format(
+ "('%s', %d, %d, %d, %d)",
+ record.getString(0),
+ record.getInt(1),
+ record.getInt(2),
+ record.getLong(3),
+ record.getLong(4));
+ default:
+ throw new UnsupportedOperationException("unsupported mode");
+ }
+ }
+
+ private String getSnapshotDir(String tableName) {
+ return path
+ + relativeTablePath(
+ ObjectIdentifier.of(
+ bEnv.getCurrentCatalog(), bEnv.getCurrentDatabase(), tableName))
+ + "/snapshot";
+ }
+
+ private Snapshot findLatestSnapshot(String tableName) throws IOException {
+ String snapshotDir = getSnapshotDir(tableName);
+ Long latest = SnapshotFinder.findLatest(new Path(URI.create(snapshotDir)));
+ return Snapshot.fromPath(
+ new Path(URI.create(snapshotDir + String.format("/snapshot-%d", latest))));
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 59f3d966..002480fd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -44,13 +44,14 @@ public abstract class FileStoreTableITCase extends AbstractTestBase {
protected TableEnvironment bEnv;
protected TableEnvironment sEnv;
+ protected String path;
@Before
public void before() throws IOException {
bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
- String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ path = TEMPORARY_FOLDER.newFolder().toURI().toString();
prepareEnv(bEnv, path);
prepareEnv(sEnv, path);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index ed756242..7716526a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -21,13 +21,14 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.types.logical.RowType;
@@ -36,11 +37,11 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -49,14 +50,22 @@ import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Stream;
+import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.apache.flink.table.store.file.FileStoreOptions.path;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -66,8 +75,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test cases for {@link TableStoreManagedFactory}. */
public class TableStoreManagedFactoryTest {
+ private static final String CATALOG = "catalog";
+ private static final String DATABASE = "database";
+ private static final String TABLE = "table";
private static final ObjectIdentifier TABLE_IDENTIFIER =
- ObjectIdentifier.of("catalog", "database", "table");
+ ObjectIdentifier.of(CATALOG, DATABASE, TABLE);
private final TableStoreManagedFactory tableStoreManagedFactory =
new TableStoreManagedFactory();
@@ -76,12 +88,12 @@ public class TableStoreManagedFactoryTest {
private DynamicTableFactory.Context context;
@ParameterizedTest
- @MethodSource("providingOptions")
+ @MethodSource("provideOptionsToEnrich")
public void testEnrichOptions(
Map<String, String> sessionOptions,
Map<String, String> tableOptions,
Map<String, String> expectedEnrichedOptions) {
- context = createTableContext(sessionOptions, tableOptions);
+ context = createNonEnrichedContext(sessionOptions, tableOptions);
Map<String, String> actualEnrichedOptions = tableStoreManagedFactory.enrichOptions(context);
assertThat(actualEnrichedOptions)
.containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
@@ -92,12 +104,12 @@ public class TableStoreManagedFactoryTest {
Map<String, String> sessionMap = new HashMap<>();
sessionMap.put("table-store.root-path", "my_path");
sessionMap.put("table-store.path", "another_path");
- context = createTableContext(sessionMap, emptyMap());
+ context = createNonEnrichedContext(sessionMap, emptyMap());
assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
.hasMessage(
"Managed table can not contain table path. You need to remove path in table options or session config.");
- context = createTableContext(emptyMap(), emptyMap());
+ context = createNonEnrichedContext(emptyMap(), emptyMap());
assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
.hasMessage(
"Please specify a root path by setting session level configuration as `SET 'table-store.root-path' = '...'`.");
@@ -109,13 +121,13 @@ public class TableStoreManagedFactoryTest {
sessionMap.put("table-store.root-path", "my_path");
sessionMap.put("table-store.log.system", "kafka");
sessionMap.put("table-store.log.topic", "my_topic");
- context = createTableContext(sessionMap, emptyMap());
+ context = createNonEnrichedContext(sessionMap, emptyMap());
assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
.hasMessage(
"Managed table can not contain custom topic. You need to remove topic in table options or session config.");
sessionMap.remove("table-store.log.topic");
- context = createTableContext(sessionMap, emptyMap());
+ context = createNonEnrichedContext(sessionMap, emptyMap());
Map<String, String> enriched = tableStoreManagedFactory.enrichOptions(context);
Map<String, String> expected = new HashMap<>();
@@ -126,13 +138,10 @@ public class TableStoreManagedFactoryTest {
}
@ParameterizedTest
- @MethodSource("providingEnrichedOptionsForCreation")
+ @MethodSource("provideOptionsToCreate")
public void testOnCreateTable(Map<String, String> enrichedOptions, boolean ignoreIfExists) {
- context = enrichContext(createTableContext(emptyMap(), enrichedOptions));
- Path expectedPath =
- Paths.get(
- sharedTempDir.toAbsolutePath().toString(),
- relativeTablePath(TABLE_IDENTIFIER));
+ context = createEnrichedContext(enrichedOptions);
+ Path expectedPath = Paths.get(path(enrichedOptions).getPath());
boolean exist = expectedPath.toFile().exists();
if (ignoreIfExists || !exist) {
tableStoreManagedFactory.onCreateTable(context, ignoreIfExists);
@@ -156,26 +165,11 @@ public class TableStoreManagedFactoryTest {
}
}
- private DynamicTableFactory.Context enrichContext(DynamicTableFactory.Context context) {
- Map<String, String> newOptions = tableStoreManagedFactory.enrichOptions(context);
- ResolvedCatalogTable table = context.getCatalogTable().copy(newOptions);
- return new FactoryUtil.DefaultDynamicTableContext(
- context.getObjectIdentifier(),
- table,
- emptyMap(),
- context.getConfiguration(),
- context.getClassLoader(),
- context.isTemporary());
- }
-
@ParameterizedTest
- @MethodSource("providingEnrichedOptionsForDrop")
+ @MethodSource("provideOptionsToDrop")
public void testOnDropTable(Map<String, String> enrichedOptions, boolean ignoreIfNotExists) {
- context = enrichContext(createTableContext(emptyMap(), enrichedOptions));
- Path expectedPath =
- Paths.get(
- sharedTempDir.toAbsolutePath().toString(),
- relativeTablePath(TABLE_IDENTIFIER));
+ context = createEnrichedContext(enrichedOptions);
+ Path expectedPath = Paths.get(path(enrichedOptions).getPath());
boolean exist = expectedPath.toFile().exists();
if (exist || ignoreIfNotExists) {
tableStoreManagedFactory.onDropTable(context, ignoreIfNotExists);
@@ -214,7 +208,7 @@ public class TableStoreManagedFactoryTest {
}
@ParameterizedTest
- @MethodSource("providingResolvedTable")
+ @MethodSource("provideResolvedTable")
public void testCreateAndCheckTableStore(
RowType rowType,
List<String> partitions,
@@ -223,18 +217,12 @@ public class TableStoreManagedFactoryTest {
ResolvedCatalogTable catalogTable =
createResolvedTable(
singletonMap(
- "path", sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
+ PATH.key(),
+ sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
rowType,
partitions,
primaryKeys);
- context =
- new FactoryUtil.DefaultDynamicTableContext(
- TABLE_IDENTIFIER,
- catalogTable,
- emptyMap(),
- Configuration.fromMap(emptyMap()),
- Thread.currentThread().getContextClassLoader(),
- false);
+ context = createEnrichedContext(TABLE_IDENTIFIER, catalogTable);
if (expectedResult.success) {
tableStoreManagedFactory.onCreateTable(context, false);
TableStore tableStore = AbstractTableStoreFactory.buildTableStore(context);
@@ -256,9 +244,59 @@ public class TableStoreManagedFactoryTest {
}
}
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 2})
+ public void testOnCompactTable(int partitionNum) {
+ context = createEnrichedContext(Collections.emptyMap());
+
+ Map<String, String> partSpec =
+ partitionNum == 0
+ ? Collections.emptyMap()
+ : partitionNum == 1 ? of("foo", "bar") : of("foo", "bar", "meow", "burr");
+ CatalogPartitionSpec catalogPartSpec = new CatalogPartitionSpec(partSpec);
+ Map<String, String> newOptions =
+ tableStoreManagedFactory.onCompactTable(context, catalogPartSpec);
+ assertThat(newOptions)
+ .containsEntry(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+ assertThat(newOptions)
+ .containsEntry(COMPACTION_PARTITION_SPEC.key(), JsonSerdeUtil.toJson(partSpec));
+ }
+
// ~ Tools ------------------------------------------------------------------
- private static Stream<Arguments> providingOptions() {
+ private static ResolvedCatalogTable getDummyTable(Map<String, String> tableOptions) {
+ return new ResolvedCatalogTable(
+ CatalogTable.of(Schema.derived(), "a comment", emptyList(), tableOptions),
+ ResolvedSchema.of(emptyList()));
+ }
+
+ private static DynamicTableFactory.Context createNonEnrichedContext(
+ Map<String, String> sessionOptions, Map<String, String> tableOptions) {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ TABLE_IDENTIFIER,
+ getDummyTable(tableOptions),
+ emptyMap(),
+ Configuration.fromMap(sessionOptions),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
+
+ private static DynamicTableFactory.Context createEnrichedContext(Map<String, String> options) {
+ return createEnrichedContext(TABLE_IDENTIFIER, getDummyTable(options));
+ }
+
+ private static DynamicTableFactory.Context createEnrichedContext(
+ ObjectIdentifier tableIdentifier, ResolvedCatalogTable catalogTable) {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ tableIdentifier,
+ catalogTable,
+ emptyMap(),
+ Configuration.fromMap(emptyMap()),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
+
+ private static Stream<Arguments> provideOptionsToEnrich() {
Map<String, String> enrichedOptions =
of(
BUCKET.key(),
@@ -315,16 +353,17 @@ public class TableStoreManagedFactoryTest {
return expected;
}
- private static Stream<Arguments> providingEnrichedOptionsForCreation() {
- Map<String, String> enrichedOptions = new HashMap<>();
- enrichedOptions.put(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+ private static Stream<Arguments> provideOptionsToCreate() {
+ Map<String, String> enrichedOptions =
+ of(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions = generateTablePath(enrichedOptions);
return Stream.of(
Arguments.of(enrichedOptions, false),
Arguments.of(enrichedOptions, true),
Arguments.of(enrichedOptions, false));
}
- private static Stream<Arguments> providingEnrichedOptionsForDrop() {
+ private static Stream<Arguments> provideOptionsToDrop() {
File tablePath =
Paths.get(
sharedTempDir.toAbsolutePath().toString(),
@@ -333,30 +372,31 @@ public class TableStoreManagedFactoryTest {
if (!tablePath.exists()) {
tablePath.mkdirs();
}
- Map<String, String> enrichedOptions = new HashMap<>();
- enrichedOptions.put(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+ Map<String, String> enrichedOptions =
+ of(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions = generateTablePath(enrichedOptions);
return Stream.of(
Arguments.of(enrichedOptions, false),
Arguments.of(enrichedOptions, true),
Arguments.of(enrichedOptions, false));
}
- private static Stream<Arguments> providingResolvedTable() {
- RowType rowType = TestKeyValueGenerator.ROW_TYPE;
+ private static Stream<Arguments> provideResolvedTable() {
+ RowType rowType = DEFAULT_ROW_TYPE;
// success case
Arguments arg0 =
Arguments.of(
rowType,
- Arrays.asList("dt", "hr"),
- Arrays.asList("dt", "hr", "shopId"), // pk is [dt, hr, shopId]
+ DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+ getPrimaryKeys(MULTI_PARTITIONED), // pk is [dt, hr, shopId]
new TableStoreTestBase.ExpectedResult().success(true));
// failed case: pk doesn't contain partition key
Arguments arg1 =
Arguments.of(
rowType,
- Arrays.asList("dt", "hr"),
- Collections.singletonList("shopId"), // pk is [shopId]
+ DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+ singletonList("shopId"), // pk is [shopId]
new TableStoreTestBase.ExpectedResult()
.success(false)
.expectedType(IllegalStateException.class)
@@ -367,13 +407,13 @@ public class TableStoreManagedFactoryTest {
Arguments arg2 =
Arguments.of(
rowType,
- Arrays.asList("dt", "hr", "shopId"),
- Arrays.asList("dt", "hr", "shopId"), // pk is [dt, hr, shopId]
+ DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+ DEFAULT_PART_TYPE.getFieldNames(), // pk is [dt, hr]
new TableStoreTestBase.ExpectedResult()
.success(false)
.expectedType(IllegalStateException.class)
.expectedMessage(
- "Primary key constraint [dt, hr, shopId] should not be same with partition fields [dt, hr, shopId],"
+ "Primary key constraint [dt, hr] should not be same with partition fields [dt, hr],"
+ " this will result in only one record in a partition"));
return Stream.of(arg0, arg1, arg2);
@@ -391,25 +431,6 @@ public class TableStoreManagedFactoryTest {
return newOptions;
}
- private static DynamicTableFactory.Context createTableContext(
- Map<String, String> sessionOptions, Map<String, String> tableOptions) {
- ResolvedCatalogTable resolvedCatalogTable =
- new ResolvedCatalogTable(
- CatalogTable.of(
- Schema.derived(),
- "a comment",
- Collections.emptyList(),
- tableOptions),
- ResolvedSchema.of(Collections.emptyList()));
- return new FactoryUtil.DefaultDynamicTableContext(
- TABLE_IDENTIFIER,
- resolvedCatalogTable,
- emptyMap(),
- Configuration.fromMap(sessionOptions),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
private static Map<String, String> of(String... kvs) {
assert kvs != null && kvs.length % 2 == 0;
Map<String, String> map = new HashMap<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 47ba4fd0..35227a39 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -18,11 +18,17 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -31,6 +37,7 @@ import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.UserCodeClassLoader;
import org.junit.Assume;
import org.junit.Before;
@@ -38,11 +45,14 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.concurrent.Callable;
import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
@@ -126,6 +136,8 @@ public class StoreSinkTest {
primaryKeys,
primaryKeys,
2,
+ false,
+ null,
() -> lock,
new HashMap<>(),
null);
@@ -187,6 +199,26 @@ public class StoreSinkTest {
.isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
}
+ @Test
+ public void testCreateCompactor() throws Exception {
+ StoreSink<?, ?> sink =
+ new StoreSink<>(
+ identifier,
+ fileStore,
+ WriteMode.CHANGE_LOG,
+ partitions,
+ primaryKeys,
+ primaryKeys,
+ 2,
+ true,
+ null,
+ () -> lock,
+ null,
+ null);
+ StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(initContext());
+ assertThat(writer).isInstanceOf(StoreSinkCompactor.class);
+ }
+
private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
writeAndCommit(
sink,
@@ -207,13 +239,14 @@ public class StoreSinkTest {
}
private List<Committable> write(StoreSink<?, ?> sink, RowData... rows) throws Exception {
- StoreSinkWriter<?> writer = sink.createWriter(null);
+ StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(null);
for (RowData row : rows) {
writer.write(row, null);
}
- List<Committable> committables = writer.prepareCommit();
- Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>(writer.writers());
+ List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
+ Map<BinaryRowData, Map<Integer, RecordWriter>> writers =
+ new HashMap<>(((StoreSinkWriter) writer).writers());
assertThat(writers.size()).isGreaterThan(0);
writer.close();
@@ -249,7 +282,7 @@ public class StoreSinkTest {
assertThat(lock.closed).isTrue();
}
- private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
+ private StoreSink<?, ?> newSink(@Nullable Map<String, String> overwritePartition) {
return new StoreSink<>(
identifier,
fileStore,
@@ -258,6 +291,8 @@ public class StoreSinkTest {
primaryKeys,
primaryKeys,
2,
+ false,
+ null,
() -> lock,
overwritePartition,
null);
@@ -283,4 +318,49 @@ public class StoreSinkTest {
closed = true;
}
}
+
+ private Sink.InitContext initContext() {
+ return new Sink.InitContext() {
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+
+ @Override
+ public MailboxExecutor getMailboxExecutor() {
+ return null;
+ }
+
+ @Override
+ public ProcessingTimeService getProcessingTimeService() {
+ return null;
+ }
+
+ @Override
+ public int getSubtaskId() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return 1;
+ }
+
+ @Override
+ public SinkWriterMetricGroup metricGroup() {
+ return null;
+ }
+
+ @Override
+ public OptionalLong getRestoredCheckpointId() {
+ return OptionalLong.empty();
+ }
+
+ @Override
+ public SerializationSchema.InitializationContext
+ asSerializationSchemaInitializationContext() {
+ return null;
+ }
+ };
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index b80462c3..5c8d255c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
@@ -89,6 +90,15 @@ public class TestFileStore implements FileStore {
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
return new TestRecordWriter(hasPk);
}
+
+ @Override
+ public CompactWriter createCompactWriter(
+ BinaryRowData partition,
+ int bucket,
+ ExecutorService compactExecutor,
+ List<DataFileMeta> restoreFiles) {
+ throw new UnsupportedOperationException();
+ }
};
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
deleted file mode 100644
index a33816a9..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.source;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link FileStoreSource}. */
-public class FileStoreSourceTest {
-
- private static final RowType RECORD_TYPE =
- RowType.of(
- new LogicalType[] {
- new IntType(), new VarCharType(), new DoubleType(), new VarCharType()
- },
- new String[] {"k0", "k1", "v0", "v1"});
-
- @MethodSource("parameters")
- @ParameterizedTest
- public void testSerDe(boolean hasPk, boolean partitioned, boolean specified)
- throws ClassNotFoundException, IOException {
- FileStore fileStore = buildFileStore(hasPk, partitioned);
- Long specifiedSnapshotId = specified ? 1L : null;
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries =
- specified ? buildManifestEntries(hasPk, partitioned) : null;
- PartitionedManifestMeta partitionedManifestMeta =
- specified
- ? new PartitionedManifestMeta(specifiedSnapshotId, specifiedManifestEntries)
- : null;
- FileStoreSource source =
- new FileStoreSource(
- fileStore,
- WriteMode.CHANGE_LOG,
- !hasPk,
- true,
- Duration.ofSeconds(1).toMillis(),
- true,
- null,
- null,
- null,
- partitionedManifestMeta);
- Object object = readObject(writeObject(source));
- assertThat(object).isInstanceOf(FileStoreSource.class);
- FileStoreSource deserialized = (FileStoreSource) object;
- assertThat(deserialized.getBoundedness()).isEqualTo(source.getBoundedness());
- if (specified) {
- assertThat(deserialized.getSpecifiedPartManifests())
- .isEqualTo(source.getSpecifiedPartManifests());
- } else {
- assertThat(deserialized.getSpecifiedPartManifests()).isNull();
- }
- }
-
- private byte[] writeObject(FileStoreSource source) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(source);
- oos.close();
- return baos.toByteArray();
- }
-
- private Object readObject(byte[] bytes) throws IOException, ClassNotFoundException {
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
- Object object = ois.readObject();
- ois.close();
- return object;
- }
-
- public static Stream<Arguments> parameters() {
- // hasPk, partitioned, specified
- return Stream.of(
- Arguments.of(true, true, false),
- Arguments.of(true, false, false),
- Arguments.of(false, false, false),
- Arguments.of(false, true, false),
- Arguments.of(true, true, true),
- Arguments.of(true, false, true),
- Arguments.of(false, false, true),
- Arguments.of(false, true, true));
- }
-
- private FileStore buildFileStore(boolean hasPk, boolean partitioned) {
- String user = "user";
- RowType partitionType = getPartitionType(partitioned);
- RowType keyType = getKeyType(hasPk);
- RowType valueType = getValueType(hasPk);
- MergeFunction mergeFunction =
- hasPk ? new DeduplicateMergeFunction() : new ValueCountMergeFunction();
- return new FileStoreImpl(
- 0,
- new FileStoreOptions(new Configuration()),
- WriteMode.CHANGE_LOG,
- user,
- partitionType,
- keyType,
- valueType,
- mergeFunction);
- }
-
- private static Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> buildManifestEntries(
- boolean hasPk, boolean partitioned) {
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries = new HashMap<>();
- Map<Integer, List<DataFileMeta>> bucketEntries = new HashMap<>();
- int totalBuckets = new Random().nextInt(10) + 1;
- for (int bucket = 0; bucket < totalBuckets; bucket++) {
- List<DataFileMeta> metaList = new ArrayList<>();
- for (Tuple2<BinaryRowData, BinaryRowData> tuple : genMinMax(hasPk)) {
- metaList.add(genDataFileMeta(hasPk, tuple.f0, tuple.f1));
- }
- bucketEntries.put(bucket, metaList);
- }
- genPartitionValues(partitioned)
- .forEach(partValue -> manifestEntries.put(partValue, bucketEntries));
- return manifestEntries;
- }
-
- private static List<BinaryRowData> genPartitionValues(boolean partitioned) {
- RowDataSerializer partSerializer = new RowDataSerializer(getPartitionType(partitioned));
- if (partitioned) {
- int partSize = new Random().nextInt(10) + 1;
- List<BinaryRowData> partKeys = new ArrayList<>();
- for (int i = 0; i < partSize; i++) {
- partKeys.add(
- partSerializer
- .toBinaryRow(
- GenericRowData.of(
- StringData.fromString(
- UUID.randomUUID().toString())))
- .copy());
- }
- return partKeys;
- }
- return Collections.singletonList(partSerializer.toBinaryRow(GenericRowData.of()).copy());
- }
-
- private static List<Tuple2<BinaryRowData, BinaryRowData>> genMinMax(boolean hasPk) {
- RowDataSerializer keySerializer = new RowDataSerializer(getKeyType(hasPk));
- int size = new Random().nextInt(20);
- List<Tuple2<BinaryRowData, BinaryRowData>> minMaxKeys = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- int k0 = new Random().nextInt(1000);
- String k1 = UUID.randomUUID().toString();
- BinaryRowData keyMin;
- BinaryRowData keyMax;
- if (hasPk) {
- keyMin =
- keySerializer.toBinaryRow(GenericRowData.of(k0, StringData.fromString(k1)));
- keyMax =
- keySerializer.toBinaryRow(
- GenericRowData.of(k0 + 100, StringData.fromString(k1)));
- } else {
- double v0 = new Random().nextDouble();
- String v1 = UUID.randomUUID().toString();
- keyMin =
- keySerializer.toBinaryRow(
- GenericRowData.of(
- k0,
- StringData.fromString(k1),
- v0,
- StringData.fromString(v1)));
- keyMax =
- keySerializer.toBinaryRow(
- GenericRowData.of(
- k0 + 100,
- StringData.fromString(k1),
- v0 + 1.5,
- StringData.fromString(v1)));
- }
- minMaxKeys.add(Tuple2.of(keyMin, keyMax));
- }
- return minMaxKeys;
- }
-
- private static DataFileMeta genDataFileMeta(
- boolean hasPk, BinaryRowData keyMin, BinaryRowData keyMax) {
- long seqNumber = new Random().nextLong() + 1;
- FieldStats k0Stats = new FieldStats(keyMin.getInt(0), keyMax.getInt(1), 0);
- FieldStats k1Stats = new FieldStats(keyMin.getString(1), keyMax.getString(1), 0);
- FieldStats v0Status =
- new FieldStats(
- hasPk ? null : keyMin.getDouble(2), hasPk ? null : keyMax.getDouble(2), 0);
- FieldStats v1Status =
- new FieldStats(
- hasPk ? null : keyMin.getString(3), hasPk ? null : keyMax.getString(3), 0);
- long count = new Random().nextLong();
-
- FieldStatsArraySerializer keyStatsSer = new FieldStatsArraySerializer(getKeyType(hasPk));
- FieldStatsArraySerializer valueStatsSer =
- new FieldStatsArraySerializer(getValueType(hasPk));
-
- FieldStats[] keyStats =
- hasPk
- ? new FieldStats[] {k0Stats, k1Stats}
- : new FieldStats[] {
- k0Stats, k1Stats,
- v0Status, v1Status
- };
- FieldStats[] valueStats =
- hasPk
- ? new FieldStats[] {
- k0Stats, k1Stats,
- v0Status, v1Status
- }
- : new FieldStats[] {
- new FieldStats(count, count + new Random().nextInt(100), 0)
- };
- return new DataFileMeta(
- "data-" + UUID.randomUUID(),
- new Random().nextInt(100),
- new Random().nextInt(100),
- keyMin,
- keyMax,
- keyStatsSer.toBinary(keyStats),
- valueStatsSer.toBinary(valueStats),
- seqNumber,
- seqNumber + new Random().nextInt(100),
- new Random().nextInt(4));
- }
-
- private static RowType getKeyType(boolean hasPk) {
- return hasPk
- ? RowType.of(
- new LogicalType[] {new IntType(), new VarCharType()},
- new String[] {"k0", "k1"})
- : RECORD_TYPE;
- }
-
- private static RowType getValueType(boolean hasPk) {
- return hasPk
- ? RECORD_TYPE
- : RowType.of(
- new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
- }
-
- private static RowType getPartitionType(boolean partitioned) {
- return partitioned
- ? RowType.of(new LogicalType[] {new VarCharType()}, new String[] {"k1"})
- : RowType.of();
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index ea196f4e..ee7428a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -19,11 +19,7 @@
package org.apache.flink.table.store.file;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
@@ -36,13 +32,16 @@ import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
+import java.util.Comparator;
import java.util.List;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/** File store implementation. */
@@ -55,8 +54,8 @@ public class FileStoreImpl implements FileStore {
private final RowType partitionType;
private final RowType keyType;
private final RowType valueType;
+ private final Supplier<Comparator<RowData>> keyComparatorSupplier;
@Nullable private final MergeFunction mergeFunction;
- private final GeneratedRecordComparator genRecordComparator;
public FileStoreImpl(
long schemaId,
@@ -75,9 +74,7 @@ public class FileStoreImpl implements FileStore {
this.keyType = keyType;
this.valueType = valueType;
this.mergeFunction = mergeFunction;
- this.genRecordComparator =
- CodeGenUtils.generateRecordComparator(
- new TableConfig(), keyType.getChildren(), "KeyComparator");
+ this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
}
public FileStorePathFactory pathFactory() {
@@ -106,17 +103,13 @@ public class FileStoreImpl implements FileStore {
return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
}
- private RecordComparator newKeyComparator() {
- return genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
- }
-
@Override
public FileStoreWriteImpl newWrite() {
return new FileStoreWriteImpl(
writeMode,
keyType,
valueType,
- this::newKeyComparator,
+ keyComparatorSupplier,
mergeFunction,
options.fileFormat(),
pathFactory(),
@@ -130,7 +123,7 @@ public class FileStoreImpl implements FileStore {
writeMode,
keyType,
valueType,
- newKeyComparator(),
+ keyComparatorSupplier.get(),
mergeFunction,
options.fileFormat(),
pathFactory());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 965d1d82..1cf74743 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -35,6 +35,7 @@ import java.util.Objects;
*/
public class Increment {
+ private static final List<DataFileMeta> EMPTY_NEW_FILES = Collections.emptyList();
private static final List<DataFileMeta> EMPTY_COMPACT_BEFORE = Collections.emptyList();
private static final List<DataFileMeta> EMPTY_COMPACT_AFTER = Collections.emptyList();
@@ -48,6 +49,11 @@ public class Increment {
return new Increment(newFiles, EMPTY_COMPACT_BEFORE, EMPTY_COMPACT_AFTER);
}
+ public static Increment forCompact(
+ List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) {
+ return new Increment(EMPTY_NEW_FILES, compactBefore, compactAfter);
+ }
+
public Increment(
List<DataFileMeta> newFiles,
List<DataFileMeta> beCompacted,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 7879965e..133e2a12 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -97,40 +97,51 @@ public class CompactManager {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Submit compaction with files (level, size): "
+ "Submit compaction with files (name, level, size): "
+ levels.levelSortedRuns().stream()
.flatMap(lsr -> lsr.run().files().stream())
.map(
file ->
String.format(
- "(%d, %d)",
- file.level(),
- file.fileSize()))
- .collect(Collectors.joining(", ")));
- LOG.debug(
- "Pick these files (level, size) for compaction: "
- + unit.files().stream()
- .map(
- file ->
- String.format(
- "(%d, %d)",
+ "(%s, %d, %d)",
+ file.fileName(),
file.level(),
file.fileSize()))
.collect(Collectors.joining(", ")));
}
-
- CompactTask task = new CompactTask(unit, dropDelete);
- taskFuture = executor.submit(task);
+ submitCompaction(unit, dropDelete);
});
}
+ public void submitCompaction(CompactUnit unit, boolean dropDelete) {
+ CompactTask task = new CompactTask(unit, dropDelete);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Pick these files (name, level, size) for compaction: {}",
+ unit.files().stream()
+ .map(
+ file ->
+ String.format(
+ "(%s, %d, %d)",
+ file.fileName(), file.level(), file.fileSize()))
+ .collect(Collectors.joining(", ")));
+ }
+ taskFuture = executor.submit(task);
+ }
+
/** Finish current task, and update result files to {@link Levels}. */
public Optional<CompactResult> finishCompaction(Levels levels, boolean blocking)
throws ExecutionException, InterruptedException {
+ Optional<CompactResult> result = finishCompaction(blocking);
+ result.ifPresent(r -> levels.update(r.before(), r.after()));
+ return result;
+ }
+
+ public Optional<CompactResult> finishCompaction(boolean blocking)
+ throws ExecutionException, InterruptedException {
if (taskFuture != null) {
if (blocking || taskFuture.isDone()) {
CompactResult result = taskFuture.get();
- levels.update(result.before(), result.after());
taskFuture = null;
return Optional.of(result);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
index dd1645a5..af8510e1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
import java.util.List;
/** A files unit for compaction. */
-interface CompactUnit {
+public interface CompactUnit {
int outputLevel();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index e8e09fc1..1c45ecfa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -170,12 +170,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
- tryCommit(
- appendChanges,
- committable.identifier(),
- committable.logOffsets(),
- Snapshot.CommitKind.APPEND,
- false);
+ if (!appendChanges.isEmpty()) {
+ tryCommit(
+ appendChanges,
+ committable.identifier(),
+ committable.logOffsets(),
+ Snapshot.CommitKind.APPEND,
+ false);
+ }
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 253bf8f6..d9e020b1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -19,8 +19,10 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import java.util.List;
import java.util.concurrent.ExecutorService;
/** Write operation which provides {@link RecordWriter} creation. */
@@ -32,4 +34,11 @@ public interface FileStoreWrite {
/** Create an empty {@link RecordWriter} from partition and bucket. */
RecordWriter createEmptyWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+
+ /** Create a compact {@link RecordWriter} from partition, bucket and restore files. */
+ RecordWriter createCompactWriter(
+ BinaryRowData partition,
+ int bucket,
+ ExecutorService compactExecutor,
+ List<DataFileMeta> restoredFiles);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 45d47206..53c2f3b3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
@@ -35,11 +34,13 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
+import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
@@ -48,6 +49,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -136,6 +138,22 @@ public class FileStoreWriteImpl implements FileStoreWrite {
return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
}
+ @Override
+ public RecordWriter createCompactWriter(
+ BinaryRowData partition,
+ int bucket,
+ ExecutorService compactExecutor,
+ List<DataFileMeta> restoredFiles) {
+ return new CompactWriter(
+ CompactUnit.fromFiles(options.numLevels - 1, restoredFiles),
+ createCompactManager(
+ partition,
+ bucket,
+ (numLevels, runs) ->
+ Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+ compactExecutor));
+ }
+
private RecordWriter createMergeTreeWriter(
BinaryRowData partition,
int bucket,
@@ -154,7 +172,14 @@ public class FileStoreWriteImpl implements FileStoreWrite {
dataFileWriter.valueType(),
options.writeBufferSize,
options.pageSize),
- createCompactManager(partition, bucket, compactExecutor),
+ createCompactManager(
+ partition,
+ bucket,
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent,
+ options.sizeRatio,
+ options.numSortedRunCompactionTrigger),
+ compactExecutor),
new Levels(keyComparator, restoreFiles, options.numLevels),
maxSequenceNumber,
keyComparator,
@@ -165,18 +190,16 @@ public class FileStoreWriteImpl implements FileStoreWrite {
}
private CompactManager createCompactManager(
- BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- CompactStrategy compactStrategy =
- new UniversalCompaction(
- options.maxSizeAmplificationPercent,
- options.sizeRatio,
- options.numSortedRunCompactionTrigger);
+ BinaryRowData partition,
+ int bucket,
+ CompactStrategy compactStrategy,
+ ExecutorService compactExecutor) {
DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
CompactManager.Rewriter rewriter =
(outputLevel, dropDelete, sections) ->
dataFileWriter.write(
- new RecordReaderIterator<KeyValue>(
+ new RecordReaderIterator<>(
new MergeTreeReader(
sections,
dropDelete,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index 0656e2f6..377d0fac 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -58,6 +58,7 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
@Nullable
public Predicate fromMap(Map<String, String> map, RowType rowType) {
+ // TODO: It is somewhat misleading that an empty map creates a null predicate filter
List<String> fieldNames = rowType.getFieldNames();
Predicate predicate = null;
for (Map.Entry<String, String> entry : map.entrySet()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 7b182acd..4f79a568 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -68,21 +68,26 @@ public class FileStorePathFactory {
this.root = root;
this.uuid = UUID.randomUUID().toString();
- String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
- this.partitionComputer =
- new RowDataPartitionComputer(
- defaultPartValue,
- partitionColumns,
- partitionType.getFields().stream()
- .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
- .toArray(DataType[]::new),
- partitionColumns);
+ this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
this.formatIdentifier = formatIdentifier;
this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
}
+ @VisibleForTesting
+ public static RowDataPartitionComputer getPartitionComputer(
+ RowType partitionType, String defaultPartValue) {
+ String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
+ return new RowDataPartitionComputer(
+ defaultPartValue,
+ partitionColumns,
+ partitionType.getFields().stream()
+ .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
+ .toArray(DataType[]::new),
+ partitionColumns);
+ }
+
public Path newManifestFile() {
return new Path(
root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
new file mode 100644
index 00000000..1b03d88b
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+/** A {@link Supplier} that returns the comparator for the file store key. */
+public class KeyComparatorSupplier implements SerializableSupplier<Comparator<RowData>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final GeneratedRecordComparator genRecordComparator;
+
+ public KeyComparatorSupplier(RowType keyType) {
+ genRecordComparator =
+ CodeGenUtils.generateRecordComparator(
+ new TableConfig(), keyType.getChildren(), "KeyComparator");
+ }
+
+ @Override
+ public RecordComparator get() {
+ return genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java
deleted file mode 100644
index e867ba4b..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
-
-/** Manifest entries per partitioned with the corresponding snapshot id. */
-public class PartitionedManifestMeta implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
- private final Long snapshotId;
-
- /** The manifest entries collected at planning phase when manual compaction is triggered. */
- private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries;
-
- public PartitionedManifestMeta(
- Long snapshotId,
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries) {
- Preconditions.checkNotNull(snapshotId, "Specified snapshot should not be null.");
- Preconditions.checkNotNull(
- specifiedManifestEntries, "Specified manifest entries should not be null.");
- this.snapshotId = snapshotId;
- this.manifestEntries = specifiedManifestEntries;
- }
-
- public Long getSnapshotId() {
- return snapshotId;
- }
-
- public Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> getManifestEntries() {
- return manifestEntries;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
-
- DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer();
- DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
- view.writeInt(manifestEntries.size());
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
- manifestEntries.entrySet()) {
- serializeBinaryRow(partEntry.getKey(), view);
- Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
- view.writeInt(bucketEntry.size());
- for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
- view.writeInt(entry.getKey());
- view.writeInt(entry.getValue().size());
- for (DataFileMeta meta : entry.getValue()) {
- metaSerializer.serialize(meta, view);
- }
- }
- }
- }
-
- private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
- in.defaultReadObject();
-
- manifestEntries = new HashMap<>();
- DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer();
- DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
- int partitionNum = view.readInt();
- for (int i = 0; i < partitionNum; i++) {
- BinaryRowData partition = deserializeBinaryRow(view);
- Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
- int bucketNum = view.readInt();
- for (int j = 0; j < bucketNum; j++) {
- int bucket = view.readInt();
- int entryNum = view.readInt();
- List<DataFileMeta> metas = new ArrayList<>();
- for (int k = 0; k < entryNum; k++) {
- metas.add(metaSerializer.deserialize(view));
- }
- bucketEntry.put(bucket, metas);
- }
- manifestEntries.put(partition, bucketEntry);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof PartitionedManifestMeta)) {
- return false;
- }
- PartitionedManifestMeta that = (PartitionedManifestMeta) o;
- return snapshotId.equals(that.snapshotId) && manifestEntries.equals(that.manifestEntries);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(snapshotId, manifestEntries);
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
new file mode 100644
index 00000000..de625beb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+ private final CompactUnit unit;
+ private final CompactManager compactManager;
+
+ public CompactWriter(CompactUnit unit, CompactManager compactManager) {
+ this.unit = unit;
+ this.compactManager = compactManager;
+ }
+
+ @Override
+ public Increment prepareCommit() throws IOException, InterruptedException {
+ List<DataFileMeta> compactBefore = new ArrayList<>();
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ if (compactManager.isCompactionFinished()) {
+ compactManager.submitCompaction(unit, true);
+ try {
+ compactManager
+ .finishCompaction(true)
+ .ifPresent(
+ result -> {
+ compactBefore.addAll(result.before());
+ compactAfter.addAll(result.after());
+ });
+ return Increment.forCompact(compactBefore, compactAfter);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+ throw new IllegalStateException("Compact manager should have finished previous task.");
+ }
+
+ @Override
+ public List<DataFileMeta> close() throws Exception {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+ // nothing to write
+ }
+
+ @Override
+ public void sync() throws Exception {}
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
index 27563671..75a44155 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
@@ -32,7 +32,7 @@ public class KeyValueSerializerTest extends ObjectSerializerTestBase<KeyValue> {
@Override
protected ObjectSerializer<KeyValue> serializer() {
return new KeyValueSerializer(
- TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+ TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE);
}
@Override
@@ -47,7 +47,7 @@ public class KeyValueSerializerTest extends ObjectSerializerTestBase<KeyValue> {
expected,
actual,
TestKeyValueGenerator.KEY_SERIALIZER,
- TestKeyValueGenerator.ROW_SERIALIZER))
+ TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER))
.isTrue();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index bb73ea79..29dc549d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.RecordComparator;
@@ -34,10 +35,16 @@ import org.apache.flink.table.types.logical.VarCharType;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
/** Random {@link KeyValue} generator. */
public class TestKeyValueGenerator {
@@ -53,7 +60,7 @@ public class TestKeyValueGenerator {
// * itemId: long, any value
// * price & amount: int[], [1 ~ 100, 1 ~ 10]
// * comment: string, length from 10 to 1000
- public static final RowType ROW_TYPE =
+ public static final RowType DEFAULT_ROW_TYPE =
RowType.of(
new LogicalType[] {
new VarCharType(false, 8),
@@ -67,18 +74,42 @@ public class TestKeyValueGenerator {
new String[] {
"dt", "hr", "shopId", "orderId", "itemId", "priceAmount", "comment"
});
- public static final RowType PARTITION_TYPE =
+ public static final RowType DEFAULT_PART_TYPE =
+ new RowType(DEFAULT_ROW_TYPE.getFields().subList(0, 2));
+
+ public static final RowType SINGLE_PARTITIONED_ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ new VarCharType(false, 8),
+ new IntType(false),
+ new BigIntType(false),
+ new BigIntType(),
+ new ArrayType(new IntType()),
+ new VarCharType(Integer.MAX_VALUE)
+ },
+ new String[] {"dt", "shopId", "orderId", "itemId", "priceAmount", "comment"});
+ public static final RowType SINGLE_PARTITIONED_PART_TYPE =
+ RowType.of(SINGLE_PARTITIONED_ROW_TYPE.getTypeAt(0));
+
+ public static final RowType NON_PARTITIONED_ROW_TYPE =
RowType.of(
- new LogicalType[] {new VarCharType(false, 8), new IntType(false)},
- new String[] {"dt", "hr"});
+ new LogicalType[] {
+ new IntType(false),
+ new BigIntType(false),
+ new BigIntType(),
+ new ArrayType(new IntType()),
+ new VarCharType(Integer.MAX_VALUE)
+ },
+ new String[] {"shopId", "orderId", "itemId", "priceAmount", "comment"});
+ public static final RowType NON_PARTITIONED_PART_TYPE = RowType.of();
+
public static final RowType KEY_TYPE =
RowType.of(
new LogicalType[] {new IntType(false), new BigIntType(false)},
new String[] {"key_shopId", "key_orderId"});
- public static final RowDataSerializer ROW_SERIALIZER = new RowDataSerializer(ROW_TYPE);
- private static final RowDataSerializer PARTITION_SERIALIZER =
- new RowDataSerializer(PARTITION_TYPE);
+ public static final RowDataSerializer DEFAULT_ROW_SERIALIZER =
+ new RowDataSerializer(DEFAULT_ROW_TYPE);
public static final RowDataSerializer KEY_SERIALIZER = new RowDataSerializer(KEY_TYPE);
public static final RecordComparator KEY_COMPARATOR =
(a, b) -> {
@@ -89,6 +120,7 @@ public class TestKeyValueGenerator {
return Long.compare(a.getLong(1), b.getLong(1));
};
+ private final GeneratorMode mode;
private final Random random;
private final List<Order> addedOrders;
@@ -96,13 +128,42 @@ public class TestKeyValueGenerator {
private long sequenceNumber;
+ private final RowDataSerializer rowSerializer;
+ private final RowDataSerializer partitionSerializer;
+
public TestKeyValueGenerator() {
+ this(MULTI_PARTITIONED);
+ }
+
+ public TestKeyValueGenerator(GeneratorMode mode) {
+ this.mode = mode;
this.random = new Random();
this.addedOrders = new ArrayList<>();
this.deletedOrders = new ArrayList<>();
this.sequenceNumber = 0;
+
+ RowType rowType;
+ RowType partitionType;
+ switch (mode) {
+ case NON_PARTITIONED:
+ rowType = NON_PARTITIONED_ROW_TYPE;
+ partitionType = NON_PARTITIONED_PART_TYPE;
+ break;
+ case SINGLE_PARTITIONED:
+ rowType = SINGLE_PARTITIONED_ROW_TYPE;
+ partitionType = SINGLE_PARTITIONED_PART_TYPE;
+ break;
+ case MULTI_PARTITIONED:
+ rowType = DEFAULT_ROW_TYPE;
+ partitionType = DEFAULT_PART_TYPE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported generator mode: " + mode);
+ }
+ rowSerializer = new RowDataSerializer(rowType);
+ partitionSerializer = new RowDataSerializer(partitionType);
}
public KeyValue next() {
@@ -140,31 +201,65 @@ public class TestKeyValueGenerator {
.copy(),
sequenceNumber++,
kind,
- ROW_SERIALIZER
- .toBinaryRow(
- GenericRowData.of(
- StringData.fromString(order.dt),
- order.hr,
- order.shopId,
- order.orderId,
- order.itemId,
- order.priceAmount == null
- ? null
- : new GenericArrayData(order.priceAmount),
- StringData.fromString(order.comment)))
- .copy());
+ rowSerializer.toBinaryRow(convertToRow(order)).copy());
+ }
+
+ private RowData convertToRow(Order order) {
+ List<Object> values =
+ new ArrayList<>(
+ Arrays.asList(
+ order.shopId,
+ order.orderId,
+ order.itemId,
+ order.priceAmount == null
+ ? null
+ : new GenericArrayData(order.priceAmount),
+ StringData.fromString(order.comment)));
+
+ if (mode == MULTI_PARTITIONED) {
+ values.add(0, StringData.fromString(order.dt));
+ values.add(1, order.hr);
+ } else if (mode == SINGLE_PARTITIONED) {
+ values.add(0, StringData.fromString(order.dt));
+ }
+ return GenericRowData.of(values.toArray(new Object[0]));
}
public BinaryRowData getPartition(KeyValue kv) {
- return PARTITION_SERIALIZER
- .toBinaryRow(GenericRowData.of(kv.value().getString(0), kv.value().getInt(1)))
- .copy();
+ Object[] values;
+ if (mode == MULTI_PARTITIONED) {
+ values = new Object[] {kv.value().getString(0), kv.value().getInt(1)};
+ } else if (mode == SINGLE_PARTITIONED) {
+ values = new Object[] {kv.value().getString(0)};
+ } else {
+ values = new Object[0];
+ }
+ return partitionSerializer.toBinaryRow(GenericRowData.of(values)).copy();
}
- public static Map<String, String> toPartitionMap(BinaryRowData partition) {
+ public static List<String> getPrimaryKeys(GeneratorMode mode) {
+ List<String> trimmedPk =
+ KEY_TYPE.getFieldNames().stream()
+ .map(f -> f.replaceFirst("key_", ""))
+ .collect(Collectors.toList());
+ if (mode != NON_PARTITIONED) {
+ trimmedPk = new ArrayList<>(trimmedPk);
+ trimmedPk.addAll(
+ mode == MULTI_PARTITIONED
+ ? DEFAULT_PART_TYPE.getFieldNames()
+ : SINGLE_PARTITIONED_PART_TYPE.getFieldNames());
+ }
+ return trimmedPk;
+ }
+
+ public static Map<String, String> toPartitionMap(BinaryRowData partition, GeneratorMode mode) {
Map<String, String> map = new HashMap<>();
- map.put("dt", partition.getString(0).toString());
- map.put("hr", String.valueOf(partition.getInt(1)));
+ if (mode == MULTI_PARTITIONED) {
+ map.put("dt", partition.getString(0).toString());
+ map.put("hr", String.valueOf(partition.getInt(1)));
+ } else if (mode == SINGLE_PARTITIONED) {
+ map.put("dt", partition.getString(0).toString());
+ }
return map;
}
@@ -220,4 +315,11 @@ public class TestKeyValueGenerator {
comment = random.nextInt(10) == 0 ? null : randomString(random.nextInt(1001 - 10) + 10);
}
}
+
+ /** Generator mode for {@link TestKeyValueGenerator}. */
+ public enum GeneratorMode {
+ NON_PARTITIONED,
+ SINGLE_PARTITIONED,
+ MULTI_PARTITIONED
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 800c565a..781a91e6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -81,7 +81,7 @@ public class DataFileTest {
checkRollingFiles(
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
data.meta,
actualMetas,
writer.suggestedFileSize());
@@ -91,7 +91,7 @@ public class DataFileTest {
data,
actualMetas,
TestKeyValueGenerator.KEY_SERIALIZER,
- TestKeyValueGenerator.ROW_SERIALIZER,
+ TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
serializer,
reader,
kv -> kv);
@@ -137,7 +137,7 @@ public class DataFileTest {
data,
actualMetas,
projectedKeySerializer,
- TestKeyValueGenerator.ROW_SERIALIZER,
+ TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
serializer,
fileReader,
kv ->
@@ -208,7 +208,7 @@ public class DataFileTest {
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
return new DataFileWriter.Factory(
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
// normal format will buffer changes in memory and we can't determine
// if the written file size is really larger than suggested, so we use a
// special format which flushes for every added element
@@ -224,7 +224,7 @@ public class DataFileTest {
DataFileReader.Factory factory =
new DataFileReader.Factory(
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new FlushingFileFormat(format),
pathFactory);
if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index f436f552..2163ed62 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -102,7 +102,7 @@ public class DataFileTestDataGenerator {
FieldStatsCollector keyStatsCollector =
new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
FieldStatsCollector valueStatsCollector =
- new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+ new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
long totalSize = 0;
BinaryRowData minKey = null;
BinaryRowData maxKey = null;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 5b9a85cf..535d63dd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -37,6 +36,7 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ManifestFile}. */
@@ -96,12 +96,11 @@ public class ManifestFileTest {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path),
- TestKeyValueGenerator.PARTITION_TYPE,
+ DEFAULT_PART_TYPE,
"default",
FileStoreOptions.FILE_FORMAT.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
- return new ManifestFile.Factory(
- TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory, suggestedFileSize)
+ return new ManifestFile.Factory(DEFAULT_PART_TYPE, avro, pathFactory, suggestedFileSize)
.create();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 5db2111d..38a4d68d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -94,10 +94,10 @@ public class ManifestListTest {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path),
- TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
"default",
FileStoreOptions.FILE_FORMAT.defaultValue());
- return new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+ return new ManifestList.Factory(TestKeyValueGenerator.DEFAULT_PART_TYPE, avro, pathFactory)
.create();
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index db5dbd8c..659ae3b8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -83,7 +83,7 @@ public class ManifestTestDataGenerator {
!entries.isEmpty(), "Manifest entries are empty. Invalid test data.");
FieldStatsCollector collector =
- new FieldStatsCollector(TestKeyValueGenerator.PARTITION_TYPE);
+ new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
long numAddedFiles = 0;
long numDeletedFiles = 0;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index fca49e69..ed270788 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -263,6 +263,24 @@ public class FileStoreCommitTest {
assertThat(snapshot.getLogOffsets()).isEqualTo(expected);
}
+ @Test
+ public void testCommitEmpty() throws Exception {
+ TestFileStore store = createStore(false, 2);
+ Snapshot snapshot =
+ store.commitData(
+ generateDataList(10),
+ gen::getPartition,
+ kv -> 0,
+ Collections.emptyMap())
+ .get(0);
+
+ store.commitData(
+ Collections.emptyList(), gen::getPartition, kv -> 0, Collections.emptyMap());
+ Path snapshotDir = store.pathFactory().snapshotDirectory();
+
+ assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(snapshot.id());
+ }
+
private TestFileStore createStore(boolean failing) {
return createStore(failing, 1);
}
@@ -276,9 +294,9 @@ public class FileStoreCommitTest {
"avro",
root,
numBucket,
- TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
}
@@ -310,7 +328,10 @@ public class FileStoreCommitTest {
LOG.debug("========== Beginning of " + name + " ==========");
for (KeyValue kv : supplier.get()) {
- LOG.debug(kv.toString(TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE));
+ LOG.debug(
+ kv.toString(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE));
}
LOG.debug("========== End of " + name + " ==========");
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 00022240..25516460 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -58,9 +58,9 @@ public class FileStoreExpireTest {
"avro",
tempDir.toString(),
1,
- TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
pathFactory = store.pathFactory();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index c9aff153..e236d517 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -133,9 +133,9 @@ public class FileStoreReadTest {
}
TestFileStore store =
createStore(
- TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
RowDataSerializer projectedValueSerializer =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index e590b76b..e2ca788c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -65,9 +65,9 @@ public class FileStoreScanTest {
"avro",
tempDir.toString(),
NUM_BUCKETS,
- TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
pathFactory = store.pathFactory();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 7ed31a87..c65ef8bb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -38,6 +38,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+
/** Testing {@link Thread}s to perform concurrent commits. */
public class TestCommitThread extends Thread {
@@ -114,7 +116,7 @@ public class TestCommitThread extends Thread {
committable,
() ->
commit.overwrite(
- TestKeyValueGenerator.toPartitionMap(partition),
+ TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED),
committable,
Collections.emptyMap()));
}