You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/13 09:12:59 UTC

[flink-table-store] branch master updated (3e476570 -> 333bb8da)

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


    from 3e476570 [FLINK-30373] Flink-table-runtime free for flink-table-store-codegen
     new 2073bd66 [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink
     new 333bb8da [FLINK-30210] Refactor StoreCompactOperator to accept records containing partitions and buckets in Table Store

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/connector/TableStoreManagedFactory.java  |  11 +-
 .../store/connector/FlinkConnectorOptions.java     |  18 --
 .../store/connector/sink/FlinkSinkBuilder.java     |  24 +-
 ...rator.java => FullChangelogStoreSinkWrite.java} | 108 ++++---
 .../store/connector/sink/StoreCompactOperator.java | 114 +++----
 .../table/store/connector/sink/StoreSink.java      |  37 +--
 .../table/store/connector/sink/StoreSinkWrite.java |  32 +-
 .../store/connector/sink/StoreSinkWriteImpl.java   | 116 ++++++++
 .../store/connector/sink/StoreWriteOperator.java   |  80 ++---
 .../table/store/connector/sink/TableStoreSink.java |   4 +-
 .../connector/source/FileStoreEmptySource.java     |  99 -------
 .../store/connector/source/FlinkSourceBuilder.java |   6 +-
 .../store/connector/AlterTableCompactITCase.java   | 329 ---------------------
 .../connector/TableStoreManagedFactoryTest.java    |  24 --
 .../file/append/AppendOnlyCompactManager.java      |   2 +-
 15 files changed, 278 insertions(+), 726 deletions(-)
 rename flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/{FullChangelogStoreWriteOperator.java => FullChangelogStoreSinkWrite.java} (84%)
 copy flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java => flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java (58%)
 create mode 100644 flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
 delete mode 100644 flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
 delete mode 100644 flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java


[flink-table-store] 01/02: [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink

Posted by cz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 2073bd66a4e41ac47a98b0c0370b0ccdeb516c62
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Nov 23 15:05:17 2022 +0800

    [FLINK-30210] Refactor StoreWriteOperator so periodic full compaction writer can be reused in compactor sink
---
 ...rator.java => FullChangelogStoreSinkWrite.java} | 108 +++++++++----------
 .../table/store/connector/sink/StoreSink.java      |  27 +++--
 .../table/store/connector/sink/StoreSinkWrite.java |  55 ++++++++++
 .../store/connector/sink/StoreSinkWriteImpl.java   | 116 +++++++++++++++++++++
 .../store/connector/sink/StoreWriteOperator.java   |  80 ++++----------
 5 files changed, 261 insertions(+), 125 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
similarity index 84%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 59f392cf..58827b4f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
@@ -36,11 +36,8 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -52,40 +49,36 @@ import java.util.Set;
 import java.util.TreeMap;
 
 /**
- * A {@link StoreWriteOperator} for {@link
- * org.apache.flink.table.store.CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog producer.
+ * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
+ * producer. This writer will perform full compaction once in a while.
  */
-public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
-
-    private static final long serialVersionUID = 1L;
+public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
 
     private final long fullCompactionThresholdMs;
 
-    private transient Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
-    private transient NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
-    private transient ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
+    private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
+    private final NavigableMap<Long, Set<Tuple2<BinaryRowData, Integer>>> writtenBuckets;
+    private final ListState<Tuple3<Long, BinaryRowData, Integer>> writtenBucketState;
 
-    private transient Long currentFirstWriteMs;
-    private transient NavigableMap<Long, Long> firstWriteMs;
-    private transient ListState<Tuple2<Long, Long>> firstWriteMsState;
+    private Long currentFirstWriteMs;
+    private final NavigableMap<Long, Long> firstWriteMs;
+    private final ListState<Tuple2<Long, Long>> firstWriteMsState;
 
-    private transient Long snapshotIdentifierToCheck;
+    private Long snapshotIdentifierToCheck;
 
-    public FullChangelogStoreWriteOperator(
+    public FullChangelogStoreSinkWrite(
             FileStoreTable table,
+            StateInitializationContext context,
             String initialCommitUser,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction,
-            long fullCompactionThresholdMs) {
-        super(table, initialCommitUser, overwritePartition, logSinkFunction);
-        this.fullCompactionThresholdMs = fullCompactionThresholdMs;
-    }
+            IOManager ioManager,
+            boolean isOverwrite,
+            long fullCompactionThresholdMs)
+            throws Exception {
+        super(table, context, initialCommitUser, ioManager, isOverwrite);
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        super.initializeState(context);
+        this.fullCompactionThresholdMs = fullCompactionThresholdMs;
 
+        currentWrittenBuckets = new HashSet<>();
         TupleSerializer<Tuple3<Long, BinaryRowData, Integer>> writtenBucketStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple3<Long, BinaryRowData, Integer>>) (Class<?>) Tuple3.class,
@@ -110,6 +103,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
                                         .computeIfAbsent(t.f0, k -> new HashSet<>())
                                         .add(Tuple2.of(t.f1, t.f2)));
 
+        currentFirstWriteMs = null;
         TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
@@ -126,18 +120,20 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
     }
 
     @Override
-    public void open() throws Exception {
-        super.open();
-        currentWrittenBuckets = new HashSet<>();
-        currentFirstWriteMs = null;
+    public SinkRecord write(RowData rowData) throws Exception {
+        SinkRecord sinkRecord = super.write(rowData);
+        touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+        return sinkRecord;
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception {
-        SinkRecord record = writeRecord(element);
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        super.compact(partition, bucket, fullCompaction);
+        touchBucket(partition, bucket);
+    }
 
-        BinaryRowData partition = record.partition();
-        int bucket = record.bucket();
+    private void touchBucket(BinaryRowData partition, int bucket) {
         // partition is a reused BinaryRowData
         // we first check if the tuple exists to minimize copying
         if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
@@ -150,27 +146,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
     }
 
     @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-
-        List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
-        for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
-                writtenBuckets.entrySet()) {
-            for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
-                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
-            }
-        }
-        writtenBucketState.update(writtenBucketList);
-
-        List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
-        for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
-            firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
-        }
-        firstWriteMsState.update(firstWriteMsList);
-    }
-
-    @Override
-    protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+    public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
         if (snapshotIdentifierToCheck != null) {
             Optional<Snapshot> snapshot = findSnapshot(snapshotIdentifierToCheck);
@@ -260,4 +236,24 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
                         commitUser,
                         identifierToCheck));
     }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        List<Tuple3<Long, BinaryRowData, Integer>> writtenBucketList = new ArrayList<>();
+        for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> entry :
+                writtenBuckets.entrySet()) {
+            for (Tuple2<BinaryRowData, Integer> bucket : entry.getValue()) {
+                writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0, bucket.f1));
+            }
+        }
+        writtenBucketState.update(writtenBucketList);
+
+        List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
+        for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
+            firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
+        }
+        firstWriteMsState.update(firstWriteMsList);
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index b556011b..5e95c875 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -82,17 +82,28 @@ public class StoreSink implements Serializable {
             return new StoreCompactOperator(table, initialCommitUser, compactPartitionSpec);
         }
 
+        boolean isOverwrite = overwritePartition != null;
+        StoreSinkWrite.Provider writeProvider;
         if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            return new FullChangelogStoreWriteOperator(
-                    table,
-                    initialCommitUser,
-                    overwritePartition,
-                    logSinkFunction,
-                    table.options().changelogProducerFullCompactionTriggerInterval().toMillis());
+            long fullCompactionThresholdMs =
+                    table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
+            writeProvider =
+                    (table, context, ioManager) ->
+                            new FullChangelogStoreSinkWrite(
+                                    table,
+                                    context,
+                                    initialCommitUser,
+                                    ioManager,
+                                    isOverwrite,
+                                    fullCompactionThresholdMs);
+        } else {
+            writeProvider =
+                    (table, context, ioManager) ->
+                            new StoreSinkWriteImpl(
+                                    table, context, initialCommitUser, ioManager, isOverwrite);
         }
 
-        return new StoreWriteOperator(
-                table, initialCommitUser, overwritePartition, logSinkFunction);
+        return new StoreWriteOperator(table, logSinkFunction, writeProvider);
     }
 
     private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
new file mode 100644
index 00000000..fe3465d9
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/** Helper class of {@link StoreWriteOperator} for different types of table store sinks. */
+interface StoreSinkWrite {
+
+    SinkRecord write(RowData rowData) throws Exception;
+
+    SinkRecord toLogRecord(SinkRecord record);
+
+    void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
+
+    List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException;
+
+    void snapshotState(StateSnapshotContext context) throws Exception;
+
+    void close() throws Exception;
+
+    @FunctionalInterface
+    interface Provider extends Serializable {
+
+        StoreSinkWrite provide(
+                FileStoreTable table, StateInitializationContext context, IOManager ioManager)
+                throws Exception;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
new file mode 100644
index 00000000..374616ec
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableWrite;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
+public class StoreSinkWriteImpl implements StoreSinkWrite {
+
+    protected final FileStoreTable table;
+    protected final String commitUser;
+    protected final TableWrite write;
+
+    public StoreSinkWriteImpl(
+            FileStoreTable table,
+            StateInitializationContext context,
+            String initialCommitUser,
+            IOManager ioManager,
+            boolean isOverwrite)
+            throws Exception {
+        this.table = table;
+
+        // Each job can only have one user name and this name must be consistent across restarts.
+        // We cannot use job id as commit user name here because user may change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        commitUser =
+                StateUtils.getSingleValueFromState(
+                        context, "commit_user_state", String.class, initialCommitUser);
+
+        // State will be null if the upstream of this subtask has finished, but some other subtasks
+        // are still running.
+        // See comments of StateUtils.getSingleValueFromState for more detail.
+        //
+        // If the state is null, no new records will come. We only need to deal with checkpoints and
+        // close events.
+        if (commitUser == null) {
+            write = null;
+        } else {
+            write = table.newWrite(commitUser).withIOManager(ioManager).withOverwrite(isOverwrite);
+        }
+    }
+
+    @Override
+    public SinkRecord write(RowData rowData) throws Exception {
+        return write.write(rowData);
+    }
+
+    @Override
+    public SinkRecord toLogRecord(SinkRecord record) {
+        return write.toLogRecord(record);
+    }
+
+    @Override
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        write.compact(partition, bucket, fullCompaction);
+    }
+
+    @Override
+    public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
+            throws IOException {
+        List<Committable> committables = new ArrayList<>();
+        if (write != null) {
+            try {
+                for (FileCommittable committable :
+                        write.prepareCommit(doCompaction, checkpointId)) {
+                    committables.add(
+                            new Committable(checkpointId, Committable.Kind.FILE, committable));
+                }
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        return committables;
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (write != null) {
+            write.close();
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index f3bfcb19..05696adc 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -35,57 +35,41 @@ import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.TableWrite;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /** A {@link PrepareCommitOperator} to write records. */
 public class StoreWriteOperator extends PrepareCommitOperator {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     protected final FileStoreTable table;
 
-    /**
-     * This commitUser is valid only for new jobs. After the job starts, this commitUser will be
-     * recorded into the states of write and commit operators. When the job restarts, commitUser
-     * will be recovered from states and this value is ignored.
-     */
-    private final String initialCommitUser;
+    @Nullable private final LogSinkFunction logSinkFunction;
 
-    @Nullable private final Map<String, String> overwritePartition;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
 
-    @Nullable private final LogSinkFunction logSinkFunction;
+    private transient StoreSinkWrite write;
 
     private transient SimpleContext sinkContext;
 
     /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
     private long currentWatermark = Long.MIN_VALUE;
 
-    @Nullable protected transient TableWrite write;
-
-    /** This is the real commit user read from state. */
-    @Nullable protected transient String commitUser;
-
     @Nullable private transient LogWriteCallback logCallback;
 
     public StoreWriteOperator(
             FileStoreTable table,
-            String initialCommitUser,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction) {
+            @Nullable LogSinkFunction logSinkFunction,
+            StoreSinkWrite.Provider storeSinkWriteProvider) {
         this.table = table;
-        this.initialCommitUser = initialCommitUser;
-        this.overwritePartition = overwritePartition;
         this.logSinkFunction = logSinkFunction;
+        this.storeSinkWriteProvider = storeSinkWriteProvider;
     }
 
     @Override
@@ -102,31 +86,18 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
-
+        write =
+                storeSinkWriteProvider.provide(
+                        table, context, getContainingTask().getEnvironment().getIOManager());
         if (logSinkFunction != null) {
             StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction);
         }
-
-        // each job can only have one user name and this name must be consistent across restarts
-        // we cannot use job id as commit user name here because user may change job id by creating
-        // a savepoint, stop the job and then resume from savepoint
-        commitUser =
-                StateUtils.getSingleValueFromState(
-                        context, "commit_user_state", String.class, initialCommitUser);
-        // see comments of StateUtils.getSingleValueFromState for why commitUser may be null
-        if (commitUser == null) {
-            write = null;
-        } else {
-            write =
-                    table.newWrite(commitUser)
-                            .withIOManager(getContainingTask().getEnvironment().getIOManager())
-                            .withOverwrite(overwritePartition != null);
-        }
     }
 
     @Override
     public void open() throws Exception {
         super.open();
+
         this.sinkContext = new SimpleContext(getProcessingTimeService());
         if (logSinkFunction != null) {
             FunctionUtils.openFunction(logSinkFunction, new Configuration());
@@ -138,6 +109,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
+
         this.currentWatermark = mark.getTimestamp();
         if (logSinkFunction != null) {
             logSinkFunction.writeWatermark(
@@ -147,10 +119,6 @@ public class StoreWriteOperator extends PrepareCommitOperator {
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception {
-        writeRecord(element);
-    }
-
-    protected SinkRecord writeRecord(StreamRecord<RowData> element) throws Exception {
         sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
 
         SinkRecord record;
@@ -165,13 +133,14 @@ public class StoreWriteOperator extends PrepareCommitOperator {
             SinkRecord logRecord = write.toLogRecord(record);
             logSinkFunction.invoke(logRecord, sinkContext);
         }
-
-        return record;
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
+
+        write.snapshotState(context);
+
         if (logSinkFunction != null) {
             StreamingFunctionUtils.snapshotFunctionState(
                     context, getOperatorStateBackend(), logSinkFunction);
@@ -181,6 +150,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void finish() throws Exception {
         super.finish();
+
         if (logSinkFunction != null) {
             logSinkFunction.finish();
         }
@@ -189,9 +159,8 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     public void close() throws Exception {
         super.close();
-        if (write != null) {
-            write.close();
-        }
+
+        write.close();
 
         if (logSinkFunction != null) {
             FunctionUtils.closeFunction(logSinkFunction);
@@ -219,18 +188,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     @Override
     protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
-        List<Committable> committables = new ArrayList<>();
-        if (write != null) {
-            try {
-                for (FileCommittable committable :
-                        write.prepareCommit(doCompaction, checkpointId)) {
-                    committables.add(
-                            new Committable(checkpointId, Committable.Kind.FILE, committable));
-                }
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
+        List<Committable> committables = write.prepareCommit(doCompaction, checkpointId);
 
         if (logCallback != null) {
             try {


[flink-table-store] 02/02: [FLINK-30210] Refactor StoreCompactOperator to accept records containing partitions and buckets in Table Store

Posted by cz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 333bb8dad697d65e70a4d672748bc1e55d7ded03
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Dec 9 14:58:40 2022 +0800

    [FLINK-30210] Refactor StoreCompactOperator to accept records containing partitions and buckets in Table Store
---
 .../store/connector/TableStoreManagedFactory.java  |  11 +-
 .../store/connector/FlinkConnectorOptions.java     |  18 --
 .../store/connector/sink/FlinkSinkBuilder.java     |  24 +-
 .../store/connector/sink/StoreCompactOperator.java | 114 +++----
 .../table/store/connector/sink/StoreSink.java      |  10 -
 .../table/store/connector/sink/TableStoreSink.java |   4 +-
 .../connector/source/FileStoreEmptySource.java     |  99 -------
 .../store/connector/source/FlinkSourceBuilder.java |   6 +-
 .../store/connector/AlterTableCompactITCase.java   | 329 ---------------------
 .../connector/TableStoreManagedFactoryTest.java    |  24 --
 .../file/append/AppendOnlyCompactManager.java      |   2 +-
 11 files changed, 52 insertions(+), 589 deletions(-)

diff --git a/flink-table-store-connector/src/main/1.15/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 19358f25..6c254ad4 100644
--- a/flink-table-store-connector/src/main/1.15/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/1.15/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.factories.ManagedTableFactory;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.util.Preconditions;
 
@@ -39,8 +38,6 @@ import java.util.Optional;
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.PATH;
 import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -173,11 +170,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
-        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
-        newOptions.put(
-                COMPACTION_PARTITION_SPEC.key(),
-                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));
-        return newOptions;
+        throw new UnsupportedOperationException(
+                "Table store does not support ALTER TABLE ... COMPACT ...");
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 07b21581..8ff00d0a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -46,24 +46,6 @@ public class FlinkConnectorOptions {
                     .noDefaultValue()
                     .withDescription("The root file path of the table store in the filesystem.");
 
-    @Internal
-    @Documentation.ExcludeFromDocumentation("Internal use only")
-    public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
-            ConfigOptions.key("compaction.manual-triggered")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "An internal flag to indicate a manual triggered compaction job.");
-
-    @Internal
-    @Documentation.ExcludeFromDocumentation("Internal use only")
-    public static final ConfigOption<String> COMPACTION_PARTITION_SPEC =
-            ConfigOptions.key("compaction.partition-spec")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "An internal json string to record the user-specified partition spec for the manual triggered compaction.");
-
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
                     .stringType()
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 858eb847..9107e19d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
@@ -38,7 +35,6 @@ import java.util.Map;
 public class FlinkSinkBuilder {
 
     private final FileStoreTable table;
-    private final Configuration conf;
 
     private DataStream<RowData> input;
     private Lock.Factory lockFactory = Lock.emptyFactory();
@@ -48,7 +44,6 @@ public class FlinkSinkBuilder {
 
     public FlinkSinkBuilder(FileStoreTable table) {
         this.table = table;
-        this.conf = Configuration.fromMap(table.schema().options());
     }
 
     public FlinkSinkBuilder withInput(DataStream<RowData> input) {
@@ -76,16 +71,6 @@ public class FlinkSinkBuilder {
         return this;
     }
 
-    @SuppressWarnings("unchecked")
-    @Nullable
-    private Map<String, String> getCompactPartSpec() {
-        String json = conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC);
-        if (json == null) {
-            return null;
-        }
-        return JsonSerdeUtil.fromJson(json, Map.class);
-    }
-
     public DataStreamSink<?> build() {
         BucketStreamPartitioner partitioner = new BucketStreamPartitioner(table.schema());
         PartitionTransformation<RowData> partitioned =
@@ -95,14 +80,7 @@ public class FlinkSinkBuilder {
         }
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        StoreSink sink =
-                new StoreSink(
-                        table,
-                        lockFactory,
-                        conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
-                        getCompactPartSpec(),
-                        overwritePartition,
-                        logSinkFunction);
+        StoreSink sink = new StoreSink(table, lockFactory, overwritePartition, logSinkFunction);
         return sink.sinkFrom(new DataStream<>(env, partitioned));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index b331059c..2cdfad51 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -18,99 +18,77 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.OffsetRowData;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
 
-/** A dedicated operator for manual triggered compaction. */
+/**
+ * A dedicated operator for manual triggered compaction.
+ *
+ * <p>In-coming records are generated by sources built from {@link
+ * org.apache.flink.table.store.connector.source.CompactorSourceBuilder}. The records will contain
+ * partition keys in the first few columns, and bucket number in the last column.
+ */
 public class StoreCompactOperator extends PrepareCommitOperator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(StoreCompactOperator.class);
-
     private final FileStoreTable table;
-    private final String commitUser;
-    @Nullable private final Map<String, String> compactPartitionSpec;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
+    private final boolean isStreaming;
 
-    private TableScan scan;
-    private TableWrite write;
+    private transient StoreSinkWrite write;
+    private transient RowDataSerializer partitionSerializer;
+    private transient OffsetRowData reusedPartition;
 
     public StoreCompactOperator(
             FileStoreTable table,
-            String commitUser,
-            @Nullable Map<String, String> compactPartitionSpec) {
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            boolean isStreaming) {
+        Preconditions.checkArgument(
+                !table.options().writeCompactionSkip(),
+                CoreOptions.WRITE_COMPACTION_SKIP.key()
+                        + " should not be true for StoreCompactOperator. This is unexpected.");
         this.table = table;
-        this.commitUser = commitUser;
-        this.compactPartitionSpec = compactPartitionSpec;
+        this.storeSinkWriteProvider = storeSinkWriteProvider;
+        this.isStreaming = isStreaming;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        write =
+                storeSinkWriteProvider.provide(
+                        table, context, getContainingTask().getEnvironment().getIOManager());
     }
 
     @Override
     public void open() throws Exception {
         super.open();
+        partitionSerializer = new RowDataSerializer(table.schema().logicalPartitionType());
+        reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 0);
+    }
 
-        scan = table.newScan();
-        if (compactPartitionSpec != null) {
-            scan.withFilter(
-                    PredicateConverter.fromMap(
-                            compactPartitionSpec, table.schema().logicalPartitionType()));
-        }
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData partitionAndBucket = element.getValue();
+        reusedPartition.replace(partitionAndBucket);
+        BinaryRowData partition = partitionSerializer.toBinaryRow(reusedPartition).copy();
+        int bucket = partitionAndBucket.getInt(partitionSerializer.getArity());
 
-        write = table.newWrite(commitUser);
+        write.compact(partition, bucket, !isStreaming);
     }
 
     @Override
     protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
-        int task = getRuntimeContext().getIndexOfThisSubtask();
-        int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
-
-        for (Split split : scan.plan().splits()) {
-            DataSplit dataSplit = (DataSplit) split;
-            BinaryRowData partition = dataSplit.partition();
-            int bucket = dataSplit.bucket();
-            if (Math.abs(Objects.hash(partition, bucket)) % numTask != task) {
-                continue;
-            }
-
-            if (LOG.isDebugEnabled()) {
-                RowType partitionType = table.schema().logicalPartitionType();
-                LOG.debug(
-                        "Do compaction for partition {}, bucket {}",
-                        FileStorePathFactory.getPartitionComputer(
-                                        partitionType,
-                                        FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue())
-                                .generatePartValues(partition),
-                        bucket);
-            }
-            try {
-                write.compact(partition, bucket, true);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        try {
-            return write.prepareCommit(true, checkpointId).stream()
-                    .map(c -> new Committable(checkpointId, Committable.Kind.FILE, c))
-                    .collect(Collectors.toList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        return write.prepareCommit(doCompaction, checkpointId);
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 5e95c875..0823e6a0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -56,32 +56,22 @@ public class StoreSink implements Serializable {
 
     private final FileStoreTable table;
     private final Lock.Factory lockFactory;
-    private final boolean compactionTask;
-    @Nullable private final Map<String, String> compactPartitionSpec;
     @Nullable private final Map<String, String> overwritePartition;
     @Nullable private final LogSinkFunction logSinkFunction;
 
     public StoreSink(
             FileStoreTable table,
             Lock.Factory lockFactory,
-            boolean compactionTask,
-            @Nullable Map<String, String> compactPartitionSpec,
             @Nullable Map<String, String> overwritePartition,
             @Nullable LogSinkFunction logSinkFunction) {
         this.table = table;
         this.lockFactory = lockFactory;
-        this.compactionTask = compactionTask;
-        this.compactPartitionSpec = compactPartitionSpec;
         this.overwritePartition = overwritePartition;
         this.logSinkFunction = logSinkFunction;
     }
 
     private OneInputStreamOperator<RowData, Committable> createWriteOperator(
             String initialCommitUser) {
-        if (compactionTask) {
-            return new StoreCompactOperator(table, initialCommitUser, compactPartitionSpec);
-        }
-
         boolean isOverwrite = overwritePartition != null;
         StoreSinkWrite.Provider writeProvider;
         if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index e63e0f25..27b4aaf3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -120,9 +120,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
         Configuration conf = Configuration.fromMap(table.schema().options());
         // Do not sink to log store when overwrite mode
         final LogSinkFunction logSinkFunction =
-                overwrite || conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED)
-                        ? null
-                        : (logSinkProvider == null ? null : logSinkProvider.createSink());
+                overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
         return new TableStoreDataStreamSinkProvider(
                 (dataStream) ->
                         new FlinkSinkBuilder(table)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
deleted file mode 100644
index bb671f23..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.source;
-
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.table.data.RowData;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-/** A dedicated empty source for "ALTER TABLE ... COMPACT". */
-public class FileStoreEmptySource
-        implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> {
-
-    @Override
-    public Boundedness getBoundedness() {
-        return Boundedness.BOUNDED;
-    }
-
-    @Override
-    public SourceReader<RowData, FileStoreSourceSplit> createReader(
-            SourceReaderContext readerContext) {
-        return new SourceReader<RowData, FileStoreSourceSplit>() {
-            @Override
-            public void start() {}
-
-            @Override
-            public InputStatus pollNext(ReaderOutput<RowData> output) {
-                return InputStatus.END_OF_INPUT;
-            }
-
-            @Override
-            public List<FileStoreSourceSplit> snapshotState(long checkpointId) {
-                return Collections.emptyList();
-            }
-
-            @Override
-            public CompletableFuture<Void> isAvailable() {
-                return CompletableFuture.completedFuture(null);
-            }
-
-            @Override
-            public void addSplits(List<FileStoreSourceSplit> splits) {}
-
-            @Override
-            public void notifyNoMoreSplits() {}
-
-            @Override
-            public void close() {}
-        };
-    }
-
-    @Override
-    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context) {
-        return restoreEnumerator(context, null);
-    }
-
-    @Override
-    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context,
-            PendingSplitsCheckpoint checkpoint) {
-        return new StaticFileStoreSplitEnumerator(context, null, Collections.emptyList());
-    }
-
-    @Override
-    public FileStoreSourceSplitSerializer getSplitSerializer() {
-        return new FileStoreSourceSplitSerializer();
-    }
-
-    @Override
-    public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
-        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 99aa81cb..5f40ae8b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -42,8 +42,6 @@ import javax.annotation.Nullable;
 
 import java.util.Optional;
 
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
-
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
  * ContinuousFileStoreSource}. This is for normal read/write jobs.
@@ -155,9 +153,7 @@ public class FlinkSourceBuilder {
                         .orElse(rowType);
         DataStreamSource<RowData> dataStream =
                 env.fromSource(
-                        conf.get(COMPACTION_MANUAL_TRIGGERED)
-                                ? new FileStoreEmptySource()
-                                : buildSource(),
+                        buildSource(),
                         watermarkStrategy == null
                                 ? WatermarkStrategy.noWatermarks()
                                 : watermarkStrategy,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
deleted file mode 100644
index fa9fdc32..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
-import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
-import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for 'ALTER TABLE ... COMPACT'. */
-public class AlterTableCompactITCase extends FileStoreTableITCase {
-
-    private TestKeyValueGenerator generator;
-
-    @Override
-    protected List<String> ddl() {
-        return Arrays.asList(
-                "CREATE TABLE IF NOT EXISTS T0 (\n"
-                        + "shopId INT\n, "
-                        + "orderId BIGINT NOT NULL\n, "
-                        + "itemId BIGINT)",
-                "CREATE TABLE IF NOT EXISTS T1 (\n"
-                        + "dt STRING\n, "
-                        + "shopId INT\n, "
-                        + "orderId BIGINT NOT NULL\n, "
-                        + "itemId BIGINT)\n "
-                        + "PARTITIONED BY (dt)",
-                "CREATE TABLE IF NOT EXISTS T2 (\n"
-                        + "dt STRING\n, "
-                        + "hr INT\n, "
-                        + "shopId INT\n, "
-                        + "orderId BIGINT NOT NULL\n, "
-                        + "itemId BIGINT)"
-                        + "PARTITIONED BY (dt, hr)",
-                "CREATE TABLE IF NOT EXISTS T3 (\n"
-                        + "f0 INT\n, "
-                        + "f1 STRING NOT NULL\n"
-                        + ") WITH (\n"
-                        + "'write-mode' = 'append-only'\n"
-                        + ")");
-    }
-
-    @Test
-    public void testNonPartitioned() {
-        generator = new TestKeyValueGenerator(NON_PARTITIONED);
-        Random random = new Random();
-        innerTest("T0", random.nextInt(10) + 1, NON_PARTITIONED);
-    }
-
-    @Test
-    public void testSinglePartitioned() {
-        generator = new TestKeyValueGenerator(SINGLE_PARTITIONED);
-        Random random = new Random();
-        innerTest("T1", random.nextInt(10) + 1, SINGLE_PARTITIONED);
-    }
-
-    @Test
-    public void testMultiPartitioned() {
-        generator = new TestKeyValueGenerator(MULTI_PARTITIONED);
-        Random random = new Random();
-        innerTest("T2", random.nextInt(10) + 1, MULTI_PARTITIONED);
-    }
-
-    @Test
-    public void testChangeNumOfSortedRunTrigger() {
-        // increase trigger
-        batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = '5')");
-
-        // write duplicates
-        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 3), " + "(4, 4, 4)");
-        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 3), " + "(4, 4, 4)");
-        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 3), " + "(4, 4, 4)");
-        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 3), " + "(4, 4, 4)");
-        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 3), " + "(4, 4, 4)");
-        Snapshot snapshot = findLatestSnapshot("T0", true);
-        assertThat(snapshot.id()).isEqualTo(6);
-        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
-
-        // decrease trigger
-        batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = '1')");
-        batchSql("ALTER TABLE T0 COMPACT");
-        assertThat(findLatestSnapshot("T0", true))
-                .usingComparator(Comparator.comparing(Snapshot::id))
-                .isEqualTo(snapshot);
-    }
-
-    @Test
-    public void testAppendOnlyTable() {
-        innerTest("INSERT INTO T3 VALUES(1, 'AAA')", 1L, Snapshot.CommitKind.APPEND);
-        innerTest("ALTER TABLE T3 COMPACT", 1L, Snapshot.CommitKind.APPEND);
-
-        innerTest("INSERT INTO T3 VALUES(2, 'BBB')", 2L, Snapshot.CommitKind.APPEND);
-        innerTest("ALTER TABLE T3 COMPACT", 2L, Snapshot.CommitKind.APPEND);
-
-        innerTest("INSERT INTO T3 VALUES(3, 'CCC')", 3L, Snapshot.CommitKind.APPEND);
-        innerTest("ALTER TABLE T3 COMPACT", 3L, Snapshot.CommitKind.APPEND);
-
-        innerTest("INSERT INTO T3 VALUES(4, 'DDD')", 4L, Snapshot.CommitKind.APPEND);
-        innerTest("ALTER TABLE T3 COMPACT", 4L, Snapshot.CommitKind.APPEND);
-
-        innerTest("INSERT INTO T3 VALUES(5, 'AAA')", 5L, Snapshot.CommitKind.APPEND);
-
-        batchSql("ALTER TABLE T3 SET ('compaction.early-max.file-num' = '5')");
-        innerTest("ALTER TABLE T3 COMPACT", 6L, Snapshot.CommitKind.COMPACT);
-    }
-
-    private void innerTest(
-            String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) {
-        batchSql(sql);
-        Snapshot snapshot = findLatestSnapshot("T3", true);
-        assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
-        assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
-    }
-
-    private void innerTest(
-            String tableName, int batchNum, TestKeyValueGenerator.GeneratorMode mode) {
-        // increase trigger to avoid auto-compaction
-        batchSql(
-                String.format(
-                        "ALTER TABLE %s SET ('num-sorted-run.compaction-trigger' = '50')",
-                        tableName));
-        batchSql(
-                String.format(
-                        "ALTER TABLE %s SET ('num-sorted-run.stop-trigger' = '50')", tableName));
-
-        Random random = new Random();
-        List<KeyValue> dataset = new ArrayList<>();
-        long latestSnapshot = 0L;
-        for (int i = 0; i < batchNum; i++) {
-            List<KeyValue> data = generateData(random.nextInt(200) + 1);
-            String insertQuery =
-                    String.format(
-                            "INSERT INTO %s VALUES \n%s",
-                            tableName,
-                            data.stream()
-                                    .map(kv -> kvAsString(kv, mode))
-                                    .collect(Collectors.joining(",\n")));
-            batchSql(insertQuery);
-            Snapshot snapshot = findLatestSnapshot(tableName, true);
-            assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-            latestSnapshot = snapshot.id();
-            dataset.addAll(data);
-        }
-        if (mode == NON_PARTITIONED) {
-            String compactQuery = String.format("ALTER TABLE %s COMPACT", tableName);
-            String selectQuery = String.format("SELECT * FROM %s", tableName);
-            compactAndCheck(
-                    tableName,
-                    compactQuery,
-                    selectQuery,
-                    latestSnapshot,
-                    dataset.stream()
-                            .map(kv -> convertToRow(kv, mode))
-                            .collect(Collectors.toList()));
-        } else {
-            List<BinaryRowData> partitions =
-                    dataset.stream()
-                            .map(kv -> generator.getPartition(kv))
-                            .distinct()
-                            .collect(Collectors.toList());
-            while (!partitions.isEmpty()) {
-                BinaryRowData part = pickPartition(partitions);
-                Map<String, String> partSpec = TestKeyValueGenerator.toPartitionMap(part, mode);
-                String compactQuery =
-                        String.format(
-                                "ALTER TABLE %s PARTITION (%s) COMPACT",
-                                tableName, partAsString(partSpec, false));
-                String selectQuery =
-                        String.format(
-                                "SELECT * FROM %s WHERE %s",
-                                tableName, partAsString(partSpec, true));
-                compactAndCheck(
-                        tableName,
-                        compactQuery,
-                        selectQuery,
-                        latestSnapshot,
-                        dataset.stream()
-                                .filter(kv -> partFilter(kv, part, mode))
-                                .map(kv -> convertToRow(kv, mode))
-                                .collect(Collectors.toList()));
-                latestSnapshot = findLatestSnapshot(tableName, true).id();
-            }
-        }
-    }
-
-    private void compactAndCheck(
-            String tableName,
-            String compactQuery,
-            String selectQuery,
-            long latestSnapshot,
-            List<Row> expectedData) {
-        batchSql(compactQuery);
-        Snapshot snapshot = findLatestSnapshot(tableName, true);
-        assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
-        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
-        // check idempotence
-        batchSql(compactQuery);
-        assertThat(findLatestSnapshot(tableName, true).id()).isEqualTo(snapshot.id());
-
-        // read data
-        List<Row> readData = batchSql(selectQuery);
-        assertThat(readData).containsExactlyInAnyOrderElementsOf(expectedData);
-    }
-
-    private boolean partFilter(
-            KeyValue kv, BinaryRowData partition, TestKeyValueGenerator.GeneratorMode mode) {
-        RowData record = kv.value();
-        if (mode == SINGLE_PARTITIONED) {
-            return record.getString(0).equals(partition.getString(0));
-        } else if (mode == MULTI_PARTITIONED) {
-            return record.getString(0).equals(partition.getString(0))
-                    && record.getInt(1) == partition.getInt(1);
-        }
-        return true;
-    }
-
-    private String partAsString(Map<String, String> partSpec, boolean predicate) {
-        String dt = String.format("dt = '%s'", partSpec.get("dt"));
-        String hr = partSpec.get("hr");
-        if (hr == null) {
-            return dt;
-        }
-        hr = String.format("hr = %s", hr);
-        return predicate ? String.join(" AND ", dt, hr) : String.join(", ", dt, hr);
-    }
-
-    private BinaryRowData pickPartition(List<BinaryRowData> partitions) {
-        Random random = new Random();
-        int idx = random.nextInt(partitions.size());
-        return partitions.remove(idx);
-    }
-
-    private List<KeyValue> generateData(int numRecords) {
-        List<KeyValue> data = new ArrayList<>();
-        for (int i = 0; i < numRecords; i++) {
-            KeyValue kv = generator.next();
-            if (kv.valueKind() == RowKind.INSERT) {
-                data.add(kv);
-            } else {
-                data.add(kv.replace(kv.key(), RowKind.INSERT, kv.value()));
-            }
-        }
-        return data;
-    }
-
-    private Row convertToRow(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
-        byte kind = keyValue.valueKind().toByteValue();
-        RowData record = keyValue.value();
-        String rowKind = RowKind.fromByteValue(kind == 0 ? kind : 3).shortString();
-        if (mode == NON_PARTITIONED) {
-            return changelogRow(rowKind, record.getInt(0), record.getLong(1), record.getLong(2));
-        } else if (mode == SINGLE_PARTITIONED) {
-            return changelogRow(
-                    rowKind,
-                    record.getString(0).toString(),
-                    record.getInt(1),
-                    record.getLong(2),
-                    record.getLong(3));
-        }
-        return changelogRow(
-                rowKind,
-                record.getString(0).toString(),
-                record.getInt(1),
-                record.getInt(2),
-                record.getLong(3),
-                record.getLong(4));
-    }
-
-    private String kvAsString(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
-        RowData record = keyValue.value();
-        switch (mode) {
-            case NON_PARTITIONED:
-                return String.format(
-                        "(%d, %d, %d)", record.getInt(0), record.getLong(1), record.getLong(2));
-            case SINGLE_PARTITIONED:
-                return String.format(
-                        "('%s', %d, %d, %d)",
-                        record.getString(0),
-                        record.getInt(1),
-                        record.getLong(2),
-                        record.getLong(3));
-            case MULTI_PARTITIONED:
-                return String.format(
-                        "('%s', %d, %d, %d, %d)",
-                        record.getString(0),
-                        record.getInt(1),
-                        record.getInt(2),
-                        record.getLong(3),
-                        record.getLong(4));
-            default:
-                throw new UnsupportedOperationException("unsupported mode");
-        }
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index fa39d08c..dcf421e8 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -21,14 +21,12 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -37,12 +35,10 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,8 +54,6 @@ import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.flink.table.store.CoreOptions.PATH;
 import static org.apache.flink.table.store.CoreOptions.path;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -227,24 +221,6 @@ public class TableStoreManagedFactoryTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(ints = {0, 1, 2})
-    public void testOnCompactTable(int partitionNum) {
-        context = createEnrichedContext(Collections.emptyMap());
-
-        Map<String, String> partSpec =
-                partitionNum == 0
-                        ? Collections.emptyMap()
-                        : partitionNum == 1 ? of("foo", "bar") : of("foo", "bar", "meow", "burr");
-        CatalogPartitionSpec catalogPartSpec = new CatalogPartitionSpec(partSpec);
-        Map<String, String> newOptions =
-                tableStoreManagedFactory.onCompactTable(context, catalogPartSpec);
-        assertThat(newOptions)
-                .containsEntry(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
-        assertThat(newOptions)
-                .containsEntry(COMPACTION_PARTITION_SPEC.key(), JsonSerdeUtil.toJson(partSpec));
-    }
-
     // ~ Tools ------------------------------------------------------------------
 
     private static ResolvedCatalogTable getDummyTable(Map<String, String> tableOptions) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index f2af051b..aab46b81 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -174,7 +174,7 @@ public class AppendOnlyCompactManager extends CompactFutureManager {
     }
 
     /**
-     * A {@link CompactTask} impl for ALTER TABLE COMPACT of append-only table.
+     * A {@link CompactTask} impl for full compaction of append-only table.
      *
      * <p>This task accepts a pre-scanned file list as input and pick the candidate files to compact
      * iteratively until reach the end of the input. There might be multiple times of rewrite