You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/05 09:05:34 UTC

(incubator-paimon) 07/12: [core] AbstractFileStoreWrite should not be the interface (#2446)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 2ff74b2a0012e06ef746b6403905a21262ecc51e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Dec 4 19:51:59 2023 +0800

    [core] AbstractFileStoreWrite should not be the interface (#2446)
---
 .../paimon/operation/AbstractFileStoreWrite.java   | 49 +--------------
 .../apache/paimon/operation/FileStoreWrite.java    | 73 +++++++++++++++++++---
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |  2 +-
 .../apache/paimon/table/sink/InnerTableWrite.java  |  2 +-
 .../apache/paimon/table/sink/TableWriteImpl.java   | 19 +++---
 .../paimon/table/DynamicBucketTableTest.java       |  4 +-
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  5 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  6 +-
 8 files changed, 86 insertions(+), 74 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index b6209199e..b1275e7a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -38,7 +38,6 @@ import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
-import org.apache.paimon.utils.Restorable;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -60,8 +59,7 @@ import java.util.concurrent.Executors;
  *
  * @param <T> type of record to write.
  */
-public abstract class AbstractFileStoreWrite<T>
-        implements FileStoreWrite<T>, Restorable<List<AbstractFileStoreWrite.State<T>>> {
+public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
 
@@ -116,6 +114,7 @@ public abstract class AbstractFileStoreWrite<T>
         this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
+    @Override
     public void withCompactExecutor(ExecutorService compactExecutor) {
         this.lazyCompactExecutor = compactExecutor;
         this.closeCompactExecutorWhenLeaving = false;
@@ -352,7 +351,7 @@ public abstract class AbstractFileStoreWrite<T>
     }
 
     @Override
-    public void isStreamingMode(boolean isStreamingMode) {
+    public void withExecutionMode(boolean isStreamingMode) {
         this.isStreamingMode = isStreamingMode;
     }
 
@@ -445,46 +444,4 @@ public abstract class AbstractFileStoreWrite<T>
             this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
         }
     }
-
-    /** Recoverable state of {@link AbstractFileStoreWrite}. */
-    public static class State<T> {
-        protected final BinaryRow partition;
-        protected final int bucket;
-
-        protected final long baseSnapshotId;
-        protected final long lastModifiedCommitIdentifier;
-        protected final List<DataFileMeta> dataFiles;
-        @Nullable protected final IndexMaintainer<T> indexMaintainer;
-        protected final CommitIncrement commitIncrement;
-
-        protected State(
-                BinaryRow partition,
-                int bucket,
-                long baseSnapshotId,
-                long lastModifiedCommitIdentifier,
-                Collection<DataFileMeta> dataFiles,
-                @Nullable IndexMaintainer<T> indexMaintainer,
-                CommitIncrement commitIncrement) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.baseSnapshotId = baseSnapshotId;
-            this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
-            this.dataFiles = new ArrayList<>(dataFiles);
-            this.indexMaintainer = indexMaintainer;
-            this.commitIncrement = commitIncrement;
-        }
-
-        @Override
-        public String toString() {
-            return String.format(
-                    "{%s, %d, %d, %d, %s, %s, %s}",
-                    partition,
-                    bucket,
-                    baseSnapshotId,
-                    lastModifiedCommitIdentifier,
-                    dataFiles,
-                    indexMaintainer,
-                    commitIncrement);
-        }
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 96fe0e1f5..e6c5ed50f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -21,15 +21,23 @@ package org.apache.paimon.operation;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.Restorable;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Write operation which provides {@link RecordWriter} creation and writes {@link SinkRecord} to
@@ -37,7 +45,7 @@ import java.util.List;
  *
  * @param <T> type of record to write.
  */
-public interface FileStoreWrite<T> {
+public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State<T>>> {
 
     FileStoreWrite<T> withIOManager(IOManager ioManager);
 
@@ -64,6 +72,18 @@ public interface FileStoreWrite<T> {
      */
     void withIgnorePreviousFiles(boolean ignorePreviousFiles);
 
+    /**
+     * We detect whether it is in batch mode, if so, we do some optimization.
+     *
+     * @param isStreamingMode whether in streaming mode
+     */
+    void withExecutionMode(boolean isStreamingMode);
+
+    /** With metrics to measure compaction. */
+    FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
+
+    void withCompactExecutor(ExecutorService compactExecutor);
+
     /**
      * Write the data to the store according to the partition and bucket.
      *
@@ -109,13 +129,6 @@ public interface FileStoreWrite<T> {
     List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
             throws Exception;
 
-    /**
-     * We detect whether it is in batch mode, if so, we do some optimization.
-     *
-     * @param isStreamingMode whether in streaming mode
-     */
-    void isStreamingMode(boolean isStreamingMode);
-
     /**
      * Close the writer.
      *
@@ -123,6 +136,46 @@ public interface FileStoreWrite<T> {
      */
     void close() throws Exception;
 
-    /** With metrics to measure compaction. */
-    FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
+    /** Recoverable state of {@link FileStoreWrite}. */
+    class State<T> {
+
+        protected final BinaryRow partition;
+        protected final int bucket;
+
+        protected final long baseSnapshotId;
+        protected final long lastModifiedCommitIdentifier;
+        protected final List<DataFileMeta> dataFiles;
+        @Nullable protected final IndexMaintainer<T> indexMaintainer;
+        protected final CommitIncrement commitIncrement;
+
+        protected State(
+                BinaryRow partition,
+                int bucket,
+                long baseSnapshotId,
+                long lastModifiedCommitIdentifier,
+                Collection<DataFileMeta> dataFiles,
+                @Nullable IndexMaintainer<T> indexMaintainer,
+                CommitIncrement commitIncrement) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.baseSnapshotId = baseSnapshotId;
+            this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
+            this.dataFiles = new ArrayList<>(dataFiles);
+            this.indexMaintainer = indexMaintainer;
+            this.commitIncrement = commitIncrement;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "{%s, %d, %d, %d, %s, %s, %s}",
+                    partition,
+                    bucket,
+                    baseSnapshotId,
+                    lastModifiedCommitIdentifier,
+                    dataFiles,
+                    indexMaintainer,
+                    commitIncrement);
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index bf4cf8800..87693ba0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -60,7 +60,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder {
     public BatchTableWrite newWrite() {
         return table.newWrite(commitUser)
                 .withIgnorePreviousFiles(staticPartition != null)
-                .isStreamingMode(false);
+                .withExecutionMode(false);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index d0231a224..5e822fc5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -24,5 +24,5 @@ public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
     InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
 
     // we detect whether in streaming mode, and do some optimization
-    InnerTableWrite isStreamingMode(boolean isStreamingMode);
+    InnerTableWrite withExecutionMode(boolean isStreamingMode);
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f34a38a44..2bea3c224 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -27,8 +27,8 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.utils.Restorable;
 
 import java.util.List;
@@ -41,10 +41,9 @@ import static org.apache.paimon.utils.Preconditions.checkState;
  *
  * @param <T> type of record to write into {@link FileStore}.
  */
-public class TableWriteImpl<T>
-        implements InnerTableWrite, Restorable<List<AbstractFileStoreWrite.State<T>>> {
+public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> {
 
-    private final AbstractFileStoreWrite<T> write;
+    private final FileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
 
@@ -54,7 +53,7 @@ public class TableWriteImpl<T>
             FileStoreWrite<T> write,
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
             RecordExtractor<T> recordExtractor) {
-        this.write = (AbstractFileStoreWrite<T>) write;
+        this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
     }
@@ -66,8 +65,8 @@ public class TableWriteImpl<T>
     }
 
     @Override
-    public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
-        write.isStreamingMode(isStreamingMode);
+    public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
+        write.withExecutionMode(isStreamingMode);
         return this;
     }
 
@@ -183,17 +182,17 @@ public class TableWriteImpl<T>
     }
 
     @Override
-    public List<AbstractFileStoreWrite.State<T>> checkpoint() {
+    public List<State<T>> checkpoint() {
         return write.checkpoint();
     }
 
     @Override
-    public void restore(List<AbstractFileStoreWrite.State<T>> state) {
+    public void restore(List<State<T>> state) {
         write.restore(state);
     }
 
     @VisibleForTesting
-    public AbstractFileStoreWrite<T> getWrite() {
+    public FileStoreWrite<T> getWrite() {
         return write;
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index d8f5e11a8..0f0be3937 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -51,8 +52,7 @@ public class DynamicBucketTableTest extends TableTestBase {
         TableWriteImpl batchTableWrite = (TableWriteImpl) builder.withOverwrite().newWrite();
         HashIndexMaintainer indexMaintainer =
                 (HashIndexMaintainer)
-                        batchTableWrite
-                                .getWrite()
+                        ((AbstractFileStoreWrite<?>) (batchTableWrite.getWrite()))
                                 .createWriterContainer(BinaryRow.EMPTY_ROW, 0, true)
                                 .indexMaintainer;
 
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 7d6eedf0b..e44c0ddca 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -686,7 +687,9 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         List<ExecutorService> compactExecutors = new ArrayList<>();
         for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
             StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl) storeSinkWrite;
-            compactExecutors.add(storeSinkWriteImpl.getWrite().getWrite().getCompactExecutor());
+            compactExecutors.add(
+                    ((AbstractFileStoreWrite<?>) storeSinkWriteImpl.getWrite().getWrite())
+                            .getCompactExecutor());
         }
         assertThat(compactExecutors.get(0) == compactExecutors.get(1)).isTrue();
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 5cd72dc2f..ceffa7648 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -27,7 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -146,7 +146,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                                         state.stateValueFilter().filter(table.name(), part, bucket))
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
-                        .isStreamingMode(isStreamingMode);
+                        .withExecutionMode(isStreamingMode);
 
         if (metricGroup != null) {
             tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
@@ -240,7 +240,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             return;
         }
 
-        List<? extends AbstractFileStoreWrite.State<?>> states = write.checkpoint();
+        List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
         write.close();
         write = newTableWrite(newTable);
         write.restore((List) states);