You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/05 09:05:34 UTC
(incubator-paimon) 07/12: [core] AbstractFileStoreWrite should not be the interface (#2446)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 2ff74b2a0012e06ef746b6403905a21262ecc51e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Dec 4 19:51:59 2023 +0800
[core] AbstractFileStoreWrite should not be the interface (#2446)
---
.../paimon/operation/AbstractFileStoreWrite.java | 49 +--------------
.../apache/paimon/operation/FileStoreWrite.java | 73 +++++++++++++++++++---
.../paimon/table/sink/BatchWriteBuilderImpl.java | 2 +-
.../apache/paimon/table/sink/InnerTableWrite.java | 2 +-
.../apache/paimon/table/sink/TableWriteImpl.java | 19 +++---
.../paimon/table/DynamicBucketTableTest.java | 4 +-
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 5 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 6 +-
8 files changed, 86 insertions(+), 74 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index b6209199e..b1275e7a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -38,7 +38,6 @@ import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
-import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -60,8 +59,7 @@ import java.util.concurrent.Executors;
*
* @param <T> type of record to write.
*/
-public abstract class AbstractFileStoreWrite<T>
- implements FileStoreWrite<T>, Restorable<List<AbstractFileStoreWrite.State<T>>> {
+public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
@@ -116,6 +114,7 @@ public abstract class AbstractFileStoreWrite<T>
this.ignorePreviousFiles = ignorePreviousFiles;
}
+ @Override
public void withCompactExecutor(ExecutorService compactExecutor) {
this.lazyCompactExecutor = compactExecutor;
this.closeCompactExecutorWhenLeaving = false;
@@ -352,7 +351,7 @@ public abstract class AbstractFileStoreWrite<T>
}
@Override
- public void isStreamingMode(boolean isStreamingMode) {
+ public void withExecutionMode(boolean isStreamingMode) {
this.isStreamingMode = isStreamingMode;
}
@@ -445,46 +444,4 @@ public abstract class AbstractFileStoreWrite<T>
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
}
}
-
- /** Recoverable state of {@link AbstractFileStoreWrite}. */
- public static class State<T> {
- protected final BinaryRow partition;
- protected final int bucket;
-
- protected final long baseSnapshotId;
- protected final long lastModifiedCommitIdentifier;
- protected final List<DataFileMeta> dataFiles;
- @Nullable protected final IndexMaintainer<T> indexMaintainer;
- protected final CommitIncrement commitIncrement;
-
- protected State(
- BinaryRow partition,
- int bucket,
- long baseSnapshotId,
- long lastModifiedCommitIdentifier,
- Collection<DataFileMeta> dataFiles,
- @Nullable IndexMaintainer<T> indexMaintainer,
- CommitIncrement commitIncrement) {
- this.partition = partition;
- this.bucket = bucket;
- this.baseSnapshotId = baseSnapshotId;
- this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
- this.dataFiles = new ArrayList<>(dataFiles);
- this.indexMaintainer = indexMaintainer;
- this.commitIncrement = commitIncrement;
- }
-
- @Override
- public String toString() {
- return String.format(
- "{%s, %d, %d, %d, %s, %s, %s}",
- partition,
- bucket,
- baseSnapshotId,
- lastModifiedCommitIdentifier,
- dataFiles,
- indexMaintainer,
- commitIncrement);
- }
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 96fe0e1f5..e6c5ed50f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -21,15 +21,23 @@ package org.apache.paimon.operation;
import org.apache.paimon.FileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.Restorable;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ExecutorService;
/**
* Write operation which provides {@link RecordWriter} creation and writes {@link SinkRecord} to
@@ -37,7 +45,7 @@ import java.util.List;
*
* @param <T> type of record to write.
*/
-public interface FileStoreWrite<T> {
+public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State<T>>> {
FileStoreWrite<T> withIOManager(IOManager ioManager);
@@ -64,6 +72,18 @@ public interface FileStoreWrite<T> {
*/
void withIgnorePreviousFiles(boolean ignorePreviousFiles);
+ /**
+ * We detect whether it is in batch mode, if so, we do some optimization.
+ *
+ * @param isStreamingMode whether in streaming mode
+ */
+ void withExecutionMode(boolean isStreamingMode);
+
+ /** With metrics to measure compaction. */
+ FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
+
+ void withCompactExecutor(ExecutorService compactExecutor);
+
/**
* Write the data to the store according to the partition and bucket.
*
@@ -109,13 +129,6 @@ public interface FileStoreWrite<T> {
List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception;
- /**
- * We detect whether it is in batch mode, if so, we do some optimization.
- *
- * @param isStreamingMode whether in streaming mode
- */
- void isStreamingMode(boolean isStreamingMode);
-
/**
* Close the writer.
*
@@ -123,6 +136,46 @@ public interface FileStoreWrite<T> {
*/
void close() throws Exception;
- /** With metrics to measure compaction. */
- FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
+ /** Recoverable state of {@link FileStoreWrite}. */
+ class State<T> {
+
+ protected final BinaryRow partition;
+ protected final int bucket;
+
+ protected final long baseSnapshotId;
+ protected final long lastModifiedCommitIdentifier;
+ protected final List<DataFileMeta> dataFiles;
+ @Nullable protected final IndexMaintainer<T> indexMaintainer;
+ protected final CommitIncrement commitIncrement;
+
+ protected State(
+ BinaryRow partition,
+ int bucket,
+ long baseSnapshotId,
+ long lastModifiedCommitIdentifier,
+ Collection<DataFileMeta> dataFiles,
+ @Nullable IndexMaintainer<T> indexMaintainer,
+ CommitIncrement commitIncrement) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.baseSnapshotId = baseSnapshotId;
+ this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
+ this.dataFiles = new ArrayList<>(dataFiles);
+ this.indexMaintainer = indexMaintainer;
+ this.commitIncrement = commitIncrement;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{%s, %d, %d, %d, %s, %s, %s}",
+ partition,
+ bucket,
+ baseSnapshotId,
+ lastModifiedCommitIdentifier,
+ dataFiles,
+ indexMaintainer,
+ commitIncrement);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index bf4cf8800..87693ba0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -60,7 +60,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder {
public BatchTableWrite newWrite() {
return table.newWrite(commitUser)
.withIgnorePreviousFiles(staticPartition != null)
- .isStreamingMode(false);
+ .withExecutionMode(false);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index d0231a224..5e822fc5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -24,5 +24,5 @@ public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
// we detect whether in streaming mode, and do some optimization
- InnerTableWrite isStreamingMode(boolean isStreamingMode);
+ InnerTableWrite withExecutionMode(boolean isStreamingMode);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f34a38a44..2bea3c224 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -27,8 +27,8 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.utils.Restorable;
import java.util.List;
@@ -41,10 +41,9 @@ import static org.apache.paimon.utils.Preconditions.checkState;
*
* @param <T> type of record to write into {@link FileStore}.
*/
-public class TableWriteImpl<T>
- implements InnerTableWrite, Restorable<List<AbstractFileStoreWrite.State<T>>> {
+public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> {
- private final AbstractFileStoreWrite<T> write;
+ private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
@@ -54,7 +53,7 @@ public class TableWriteImpl<T>
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor) {
- this.write = (AbstractFileStoreWrite<T>) write;
+ this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
}
@@ -66,8 +65,8 @@ public class TableWriteImpl<T>
}
@Override
- public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
- write.isStreamingMode(isStreamingMode);
+ public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
+ write.withExecutionMode(isStreamingMode);
return this;
}
@@ -183,17 +182,17 @@ public class TableWriteImpl<T>
}
@Override
- public List<AbstractFileStoreWrite.State<T>> checkpoint() {
+ public List<State<T>> checkpoint() {
return write.checkpoint();
}
@Override
- public void restore(List<AbstractFileStoreWrite.State<T>> state) {
+ public void restore(List<State<T>> state) {
write.restore(state);
}
@VisibleForTesting
- public AbstractFileStoreWrite<T> getWrite() {
+ public FileStoreWrite<T> getWrite() {
return write;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index d8f5e11a8..0f0be3937 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -51,8 +52,7 @@ public class DynamicBucketTableTest extends TableTestBase {
TableWriteImpl batchTableWrite = (TableWriteImpl) builder.withOverwrite().newWrite();
HashIndexMaintainer indexMaintainer =
(HashIndexMaintainer)
- batchTableWrite
- .getWrite()
+ ((AbstractFileStoreWrite<?>) (batchTableWrite.getWrite()))
.createWriterContainer(BinaryRow.EMPTY_ROW, 0, true)
.indexMaintainer;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 7d6eedf0b..e44c0ddca 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -686,7 +687,9 @@ public class CdcRecordStoreMultiWriteOperatorTest {
List<ExecutorService> compactExecutors = new ArrayList<>();
for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl) storeSinkWrite;
- compactExecutors.add(storeSinkWriteImpl.getWrite().getWrite().getCompactExecutor());
+ compactExecutors.add(
+ ((AbstractFileStoreWrite<?>) storeSinkWriteImpl.getWrite().getWrite())
+ .getCompactExecutor());
}
assertThat(compactExecutors.get(0) == compactExecutors.get(1)).isTrue();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 5cd72dc2f..ceffa7648 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -27,7 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
@@ -146,7 +146,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
state.stateValueFilter().filter(table.name(), part, bucket))
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
- .isStreamingMode(isStreamingMode);
+ .withExecutionMode(isStreamingMode);
if (metricGroup != null) {
tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
@@ -240,7 +240,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
return;
}
- List<? extends AbstractFileStoreWrite.State<?>> states = write.checkpoint();
+ List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
write.close();
write = newTableWrite(newTable);
write.restore((List) states);