You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by cz...@apache.org on 2023/03/21 02:49:51 UTC
[incubator-paimon] branch master updated: [FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 556443f09 [FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes
556443f09 is described below
commit 556443f098252a98610a03450ed8ad2a661f27dd
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 21 10:49:47 2023 +0800
[FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes
This closes #649.
---
.../main/java/org/apache/paimon/cdc/CdcRecord.java | 58 ++++++
...erator.java => AbstractStoreWriteOperator.java} | 21 +--
.../apache/paimon/flink/sink/FileStoreSink.java | 2 +-
.../flink/sink/FullChangelogStoreSinkWrite.java | 3 +-
.../paimon/flink/sink/PrepareCommitOperator.java | 8 +-
.../flink/sink/RowDataStoreWriteOperator.java | 52 ++++++
.../paimon/flink/sink/StoreCompactOperator.java | 2 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 19 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 19 +-
.../sink/cdc/SchemaAwareStoreWriteOperator.java | 102 +++++++++++
.../flink/sink/FileStoreShuffleBucketTest.java | 6 +
.../cdc/SchemaAwareStoreWriteOperatorTest.java | 198 +++++++++++++++++++++
12 files changed, 462 insertions(+), 28 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
new file mode 100644
index 000000000..3c65dec4e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.cdc;
+
+import org.apache.paimon.types.RowKind;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** A data change message from the CDC source. */
+public class CdcRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowKind kind;
+ private final Map<String, String> fields;
+
+ public CdcRecord(RowKind kind, Map<String, String> fields) {
+ this.kind = kind;
+ this.fields = fields;
+ }
+
+ public RowKind kind() {
+ return kind;
+ }
+
+ /** Map key is the field's name, and map value is the field's value. */
+ public Map<String, String> fields() {
+ return fields;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof CdcRecord)) {
+ return false;
+ }
+
+ CdcRecord that = (CdcRecord) o;
+ return Objects.equals(kind, that.kind) && Objects.equals(fields, that.fields);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
similarity index 93%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
index c04b1f6d8..9f9d34a4e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
@@ -45,17 +43,17 @@ import java.io.IOException;
import java.util.List;
/** A {@link PrepareCommitOperator} to write records. */
-public class StoreWriteOperator extends PrepareCommitOperator {
+public abstract class AbstractStoreWriteOperator<T> extends PrepareCommitOperator<T> {
private static final long serialVersionUID = 2L;
- protected final FileStoreTable table;
+ protected FileStoreTable table;
@Nullable private final LogSinkFunction logSinkFunction;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
- private transient StoreSinkWrite write;
+ protected transient StoreSinkWrite write;
private transient SimpleContext sinkContext;
@@ -64,7 +62,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
@Nullable private transient LogWriteCallback logCallback;
- public StoreWriteOperator(
+ public AbstractStoreWriteOperator(
FileStoreTable table,
@Nullable LogSinkFunction logSinkFunction,
StoreSinkWrite.Provider storeSinkWriteProvider) {
@@ -119,15 +117,10 @@ public class StoreWriteOperator extends PrepareCommitOperator {
}
@Override
- public void processElement(StreamRecord<RowData> element) throws Exception {
+ public void processElement(StreamRecord<T> element) throws Exception {
sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
- SinkRecord record;
- try {
- record = write.write(new FlinkRowWrapper(element.getValue()));
- } catch (Exception e) {
- throw new IOException(e);
- }
+ SinkRecord record = processRecord(element.getValue());
if (logSinkFunction != null) {
// write to log store, need to preserve original pk (which includes partition fields)
@@ -136,6 +129,8 @@ public class StoreWriteOperator extends PrepareCommitOperator {
}
}
+ protected abstract SinkRecord processRecord(T record) throws Exception;
+
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 702d3c050..5e6cf4181 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -54,7 +54,7 @@ public class FileStoreSink extends FlinkSink {
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
- return new StoreWriteOperator(table, logSinkFunction, writeProvider);
+ return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
index 6e988dd05..c733c6435 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
@@ -59,6 +59,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
private static final Logger LOG = LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
+ private final SnapshotManager snapshotManager;
private final long fullCompactionThresholdMs;
private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
@@ -81,6 +82,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
throws Exception {
super(table, context, initialCommitUser, ioManager, isOverwrite);
+ this.snapshotManager = table.snapshotManager();
this.fullCompactionThresholdMs = fullCompactionThresholdMs;
currentWrittenBuckets = new HashSet<>();
@@ -197,7 +199,6 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
}
private void checkSuccessfulFullCompaction() {
- SnapshotManager snapshotManager = table.snapshotManager();
Long latestId = snapshotManager.latestSnapshotId();
if (latestId == null) {
return;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index e1d795e44..0b3683ff0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -23,14 +23,13 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
import java.io.IOException;
import java.util.List;
/** Prepare commit operator to emit {@link Committable}s. */
-public abstract class PrepareCommitOperator extends AbstractStreamOperator<Committable>
- implements OneInputStreamOperator<RowData, Committable>, BoundedOneInput {
+public abstract class PrepareCommitOperator<T> extends AbstractStreamOperator<Committable>
+ implements OneInputStreamOperator<T, Committable>, BoundedOneInput {
private boolean endOfInput = false;
@@ -38,9 +37,6 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
setChainingStrategy(ChainingStrategy.ALWAYS);
}
- @Override
- public void processElement(StreamRecord<RowData> element) throws Exception {}
-
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
new file mode 100644
index 000000000..55a6b8375
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.SinkRecord;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * An {@link AbstractStoreWriteOperator} which accepts Flink's {@link RowData}. The schema of its
+ * writer never changes.
+ */
+public class RowDataStoreWriteOperator extends AbstractStoreWriteOperator<RowData> {
+
+ public RowDataStoreWriteOperator(
+ FileStoreTable table,
+ @Nullable LogSinkFunction logSinkFunction,
+ StoreSinkWrite.Provider storeSinkWriteProvider) {
+ super(table, logSinkFunction, storeSinkWriteProvider);
+ }
+
+ @Override
+ protected SinkRecord processRecord(RowData record) throws Exception {
+ try {
+ return write.write(new FlinkRowWrapper(record));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 800744e3d..13fc24edc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -42,7 +42,7 @@ import java.util.List;
* org.apache.paimon.flink.source.CompactorSourceBuilder}. The records will contain partition keys
* in the first few columns, and bucket number in the last column.
*/
-public class StoreCompactOperator extends PrepareCommitOperator {
+public class StoreCompactOperator extends PrepareCommitOperator<RowData> {
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index eef42a2e7..ff6dce668 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -31,9 +32,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import java.util.function.Function;
-/** Helper class of {@link StoreWriteOperator} for different types of paimon sinks. */
-interface StoreSinkWrite {
+/** Helper class of {@link AbstractStoreWriteOperator} for different types of paimon sinks. */
+public interface StoreSinkWrite {
SinkRecord write(InternalRow rowData) throws Exception;
@@ -49,6 +51,19 @@ interface StoreSinkWrite {
void close() throws Exception;
+ /**
+ * Replace the internal {@link TableWriteImpl} with the one provided by {@code
+ * newWriteProvider}. The state of the old {@link TableWriteImpl} will also be transferred to
+ * the new {@link TableWriteImpl} by {@link TableWriteImpl#checkpoint()} and {@link
+ * TableWriteImpl#restore(List)}.
+ *
+ * <p>Currently, this method is only used by CDC sinks because they need to deal with schema
+ * changes. {@link TableWriteImpl} with the new schema will be provided by {@code
+ * newWriteProvider}.
+ */
+ void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws Exception;
+
+ /** Provider of {@link StoreSinkWrite}. */
@FunctionalInterface
interface Provider extends Serializable {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index c9c1c3b27..5c24bd035 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
@@ -36,15 +37,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
/** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
public class StoreSinkWriteImpl implements StoreSinkWrite {
private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);
- protected final FileStoreTable table;
protected final String commitUser;
- protected final TableWriteImpl<?> write;
+ protected TableWriteImpl<?> write;
public StoreSinkWriteImpl(
FileStoreTable table,
@@ -53,8 +54,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
IOManager ioManager,
boolean isOverwrite)
throws Exception {
- this.table = table;
-
// Each job can only have one user name and this name must be consistent across restarts.
// We cannot use job id as commit user name here because user may change job id by creating
// a savepoint, stop the job and then resume from savepoint.
@@ -136,4 +135,16 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
write.close();
}
}
+
+ @Override
+ public void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws Exception {
+ if (commitUser == null) {
+ return;
+ }
+
+ List<AbstractFileStoreWrite.State> states = write.checkpoint();
+ write.close();
+ write = newWriteProvider.apply(commitUser);
+ write.restore(states);
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
new file mode 100644
index 000000000..4c109ca93
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.TypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * An {@link AbstractStoreWriteOperator} which is aware of schema changes.
+ *
+ * <p>When the input {@link CdcRecord} contains a field name not in the current {@link TableSchema},
+ * it periodically queries the latest schema, until the latest schema contains that field name.
+ */
+public class SchemaAwareStoreWriteOperator extends AbstractStoreWriteOperator<CdcRecord> {
+
+ static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+ ConfigOptions.key("cdc.retry-sleep-time")
+ .durationType()
+ .defaultValue(Duration.ofMillis(500));
+
+ private final long retrySleepMillis;
+
+ public SchemaAwareStoreWriteOperator(
+ FileStoreTable table,
+ @Nullable LogSinkFunction logSinkFunction,
+ StoreSinkWrite.Provider storeSinkWriteProvider) {
+ super(table, logSinkFunction, storeSinkWriteProvider);
+ retrySleepMillis = table.options().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+ }
+
+ @Override
+ protected SinkRecord processRecord(CdcRecord record) throws Exception {
+ Map<String, String> fields = record.fields();
+
+ if (!schemaMatched(fields)) {
+ while (true) {
+ table = table.copyWithLatestSchema();
+ if (schemaMatched(fields)) {
+ break;
+ }
+ Thread.sleep(retrySleepMillis);
+ }
+ write.replace(commitUser -> table.newWrite(commitUser));
+ }
+
+ TableSchema schema = table.schema();
+ GenericRow row = new GenericRow(schema.fields().size());
+ row.setRowKind(record.kind());
+ for (Map.Entry<String, String> field : fields.entrySet()) {
+ String key = field.getKey();
+ String value = field.getValue();
+ int idx = schema.fieldNames().indexOf(key);
+ DataType type = schema.fields().get(idx).type();
+ // TODO TypeUtils.castFromString cannot deal with complex types like arrays and maps.
+ // Change type of CdcRecord#field if needed.
+ row.setField(idx, TypeUtils.castFromString(value, type));
+ }
+
+ try {
+ return write.write(row);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private boolean schemaMatched(Map<String, String> fields) {
+ TableSchema currentSchema = table.schema();
+ return currentSchema.fieldNames().containsAll(fields.keySet());
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
index b16335d65..e1d671964 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -159,5 +161,9 @@ public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
@Override
public void close() throws Exception {}
+
+ @Override
+ public void replace(Function<String, TableWriteImpl<?>> newWriteProvider)
+ throws Exception {}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
new file mode 100644
index 000000000..c80196ef5
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SchemaAwareStoreWriteOperator}. */
+public class SchemaAwareStoreWriteOperatorTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING()},
+ new String[] {"pt", "k", "v"});
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Path tablePath;
+ private String commitUser;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @AfterEach
+ public void after() {
+ // assert all connections are closed
+ Predicate<Path> pathPredicate = path -> path.toString().contains(tempDir.toString());
+ assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+ assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ @Test
+ @Timeout(30)
+ public void testProcessRecord() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+ createTestHarness(table);
+ harness.open();
+
+ BlockingQueue<CdcRecord> toProcess = new LinkedBlockingQueue<>();
+ BlockingQueue<CdcRecord> processed = new LinkedBlockingQueue<>();
+ AtomicBoolean running = new AtomicBoolean(true);
+ Runnable r =
+ () -> {
+ long timestamp = 0;
+ try {
+ while (running.get()) {
+ if (toProcess.isEmpty()) {
+ Thread.sleep(10);
+ continue;
+ }
+
+ CdcRecord record = toProcess.poll();
+ harness.processElement(record, ++timestamp);
+ processed.offer(record);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.start();
+
+ // check that records with compatible schema can be processed immediately
+
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pt", "0");
+ fields.put("k", "1");
+ fields.put("v", "10");
+ CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+ toProcess.offer(expected);
+ CdcRecord actual = processed.take();
+ assertThat(actual).isEqualTo(expected);
+
+ fields = new HashMap<>();
+ fields.put("pt", "0");
+ fields.put("k", "2");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ toProcess.offer(expected);
+ actual = processed.take();
+ assertThat(actual).isEqualTo(expected);
+
+ // check that records with new fields should be processed after schema is updated
+
+ fields = new HashMap<>();
+ fields.put("pt", "0");
+ fields.put("k", "3");
+ fields.put("v", "30");
+ fields.put("v2", "300");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ toProcess.offer(expected);
+ actual = processed.poll(1, TimeUnit.SECONDS);
+ assertThat(actual).isNull();
+
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
+ schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.INT()));
+ actual = processed.take();
+ assertThat(actual).isEqualTo(expected);
+
+ running.set(false);
+ t.join();
+ harness.close();
+ }
+
+ private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness(
+ FileStoreTable table) throws Exception {
+ SchemaAwareStoreWriteOperator operator =
+ new SchemaAwareStoreWriteOperator(
+ table,
+ null,
+ (t, context, ioManager) ->
+ new StoreSinkWriteImpl(t, context, commitUser, ioManager, false));
+ TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
+ TypeSerializer<Committable> outputSerializer =
+ new CommittableTypeInfo().createSerializer(new ExecutionConfig());
+ OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator, inputSerializer);
+ harness.setup(outputSerializer);
+ return harness;
+ }
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10));
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+ }
+}