You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by cz...@apache.org on 2023/03/21 02:49:51 UTC

[incubator-paimon] branch master updated: [FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 556443f09 [FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes
556443f09 is described below

commit 556443f098252a98610a03450ed8ad2a661f27dd
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 21 10:49:47 2023 +0800

    [FLINK-31432] Introduce a special StoreWriteOperator to deal with schema changes
    
    This closes #649.
---
 .../main/java/org/apache/paimon/cdc/CdcRecord.java |  58 ++++++
 ...erator.java => AbstractStoreWriteOperator.java} |  21 +--
 .../apache/paimon/flink/sink/FileStoreSink.java    |   2 +-
 .../flink/sink/FullChangelogStoreSinkWrite.java    |   3 +-
 .../paimon/flink/sink/PrepareCommitOperator.java   |   8 +-
 .../flink/sink/RowDataStoreWriteOperator.java      |  52 ++++++
 .../paimon/flink/sink/StoreCompactOperator.java    |   2 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |  19 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  19 +-
 .../sink/cdc/SchemaAwareStoreWriteOperator.java    | 102 +++++++++++
 .../flink/sink/FileStoreShuffleBucketTest.java     |   6 +
 .../cdc/SchemaAwareStoreWriteOperatorTest.java     | 198 +++++++++++++++++++++
 12 files changed, 462 insertions(+), 28 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
new file mode 100644
index 000000000..3c65dec4e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.cdc;
+
+import org.apache.paimon.types.RowKind;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** A data change message from the CDC source. */
+public class CdcRecord implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowKind kind;
+    private final Map<String, String> fields;
+
+    public CdcRecord(RowKind kind, Map<String, String> fields) {
+        this.kind = kind;
+        this.fields = fields;
+    }
+
+    public RowKind kind() {
+        return kind;
+    }
+
+    /** Map key is the field's name, and map value is the field's value. */
+    public Map<String, String> fields() {
+        return fields;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof CdcRecord)) {
+            return false;
+        }
+
+        CdcRecord that = (CdcRecord) o;
+        return Objects.equals(kind, that.kind) && Objects.equals(fields, that.fields);
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
similarity index 93%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
index c04b1f6d8..9f9d34a4e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.log.LogWriteCallback;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
@@ -45,17 +43,17 @@ import java.io.IOException;
 import java.util.List;
 
 /** A {@link PrepareCommitOperator} to write records. */
-public class StoreWriteOperator extends PrepareCommitOperator {
+public abstract class AbstractStoreWriteOperator<T> extends PrepareCommitOperator<T> {
 
     private static final long serialVersionUID = 2L;
 
-    protected final FileStoreTable table;
+    protected FileStoreTable table;
 
     @Nullable private final LogSinkFunction logSinkFunction;
 
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
 
-    private transient StoreSinkWrite write;
+    protected transient StoreSinkWrite write;
 
     private transient SimpleContext sinkContext;
 
@@ -64,7 +62,7 @@ public class StoreWriteOperator extends PrepareCommitOperator {
 
     @Nullable private transient LogWriteCallback logCallback;
 
-    public StoreWriteOperator(
+    public AbstractStoreWriteOperator(
             FileStoreTable table,
             @Nullable LogSinkFunction logSinkFunction,
             StoreSinkWrite.Provider storeSinkWriteProvider) {
@@ -119,15 +117,10 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception {
+    public void processElement(StreamRecord<T> element) throws Exception {
         sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
 
-        SinkRecord record;
-        try {
-            record = write.write(new FlinkRowWrapper(element.getValue()));
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+        SinkRecord record = processRecord(element.getValue());
 
         if (logSinkFunction != null) {
             // write to log store, need to preserve original pk (which includes partition fields)
@@ -136,6 +129,8 @@ public class StoreWriteOperator extends PrepareCommitOperator {
         }
     }
 
+    protected abstract SinkRecord processRecord(T record) throws Exception;
+
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 702d3c050..5e6cf4181 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -54,7 +54,7 @@ public class FileStoreSink extends FlinkSink {
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
             StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
-        return new StoreWriteOperator(table, logSinkFunction, writeProvider);
+        return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
index 6e988dd05..c733c6435 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
@@ -59,6 +59,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
 
     private static final Logger LOG = LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
 
+    private final SnapshotManager snapshotManager;
     private final long fullCompactionThresholdMs;
 
     private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
@@ -81,6 +82,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
             throws Exception {
         super(table, context, initialCommitUser, ioManager, isOverwrite);
 
+        this.snapshotManager = table.snapshotManager();
         this.fullCompactionThresholdMs = fullCompactionThresholdMs;
 
         currentWrittenBuckets = new HashSet<>();
@@ -197,7 +199,6 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
     }
 
     private void checkSuccessfulFullCompaction() {
-        SnapshotManager snapshotManager = table.snapshotManager();
         Long latestId = snapshotManager.latestSnapshotId();
         if (latestId == null) {
             return;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index e1d795e44..0b3683ff0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -23,14 +23,13 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
 
 import java.io.IOException;
 import java.util.List;
 
 /** Prepare commit operator to emit {@link Committable}s. */
-public abstract class PrepareCommitOperator extends AbstractStreamOperator<Committable>
-        implements OneInputStreamOperator<RowData, Committable>, BoundedOneInput {
+public abstract class PrepareCommitOperator<T> extends AbstractStreamOperator<Committable>
+        implements OneInputStreamOperator<T, Committable>, BoundedOneInput {
 
     private boolean endOfInput = false;
 
@@ -38,9 +37,6 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
-    @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception {}
-
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         if (!endOfInput) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
new file mode 100644
index 000000000..55a6b8375
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.SinkRecord;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * An {@link AbstractStoreWriteOperator} which accepts Flink's {@link RowData}. The schema of its
+ * writer never changes.
+ */
+public class RowDataStoreWriteOperator extends AbstractStoreWriteOperator<RowData> {
+
+    public RowDataStoreWriteOperator(
+            FileStoreTable table,
+            @Nullable LogSinkFunction logSinkFunction,
+            StoreSinkWrite.Provider storeSinkWriteProvider) {
+        super(table, logSinkFunction, storeSinkWriteProvider);
+    }
+
+    @Override
+    protected SinkRecord processRecord(RowData record) throws Exception {
+        try {
+            return write.write(new FlinkRowWrapper(record));
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 800744e3d..13fc24edc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -42,7 +42,7 @@ import java.util.List;
  * org.apache.paimon.flink.source.CompactorSourceBuilder}. The records will contain partition keys
  * in the first few columns, and bucket number in the last column.
  */
-public class StoreCompactOperator extends PrepareCommitOperator {
+public class StoreCompactOperator extends PrepareCommitOperator<RowData> {
 
     private final FileStoreTable table;
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index eef42a2e7..ff6dce668 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableWriteImpl;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -31,9 +32,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.function.Function;
 
-/** Helper class of {@link StoreWriteOperator} for different types of paimon sinks. */
-interface StoreSinkWrite {
+/** Helper class of {@link AbstractStoreWriteOperator} for different types of paimon sinks. */
+public interface StoreSinkWrite {
 
     SinkRecord write(InternalRow rowData) throws Exception;
 
@@ -49,6 +51,19 @@ interface StoreSinkWrite {
 
     void close() throws Exception;
 
+    /**
+     * Replace the internal {@link TableWriteImpl} with the one provided by {@code
+     * newWriteProvider}. The state of the old {@link TableWriteImpl} will also be transferred to
+     * the new {@link TableWriteImpl} by {@link TableWriteImpl#checkpoint()} and {@link
+     * TableWriteImpl#restore(List)}.
+     *
+     * <p>Currently, this method is only used by CDC sinks because they need to deal with schema
+     * changes. {@link TableWriteImpl} with the new schema will be provided by {@code
+     * newWriteProvider}.
+     */
+    void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws Exception;
+
+    /** Provider of {@link StoreSinkWrite}. */
     @FunctionalInterface
     interface Provider extends Serializable {
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index c9c1c3b27..5c24bd035 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -36,15 +37,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 /** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
 public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);
 
-    protected final FileStoreTable table;
     protected final String commitUser;
-    protected final TableWriteImpl<?> write;
+    protected TableWriteImpl<?> write;
 
     public StoreSinkWriteImpl(
             FileStoreTable table,
@@ -53,8 +54,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             IOManager ioManager,
             boolean isOverwrite)
             throws Exception {
-        this.table = table;
-
         // Each job can only have one user name and this name must be consistent across restarts.
         // We cannot use job id as commit user name here because user may change job id by creating
         // a savepoint, stop the job and then resume from savepoint.
@@ -136,4 +135,16 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             write.close();
         }
     }
+
+    @Override
+    public void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws Exception {
+        if (commitUser == null) {
+            return;
+        }
+
+        List<AbstractFileStoreWrite.State> states = write.checkpoint();
+        write.close();
+        write = newWriteProvider.apply(commitUser);
+        write.restore(states);
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
new file mode 100644
index 000000000..4c109ca93
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.TypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * An {@link AbstractStoreWriteOperator} which is aware of schema changes.
+ *
+ * <p>When the input {@link CdcRecord} contains a field name not in the current {@link TableSchema},
+ * it periodically queries the latest schema, until the latest schema contains that field name.
+ */
+public class SchemaAwareStoreWriteOperator extends AbstractStoreWriteOperator<CdcRecord> {
+
+    static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+            ConfigOptions.key("cdc.retry-sleep-time")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(500));
+
+    private final long retrySleepMillis;
+
+    public SchemaAwareStoreWriteOperator(
+            FileStoreTable table,
+            @Nullable LogSinkFunction logSinkFunction,
+            StoreSinkWrite.Provider storeSinkWriteProvider) {
+        super(table, logSinkFunction, storeSinkWriteProvider);
+        retrySleepMillis = table.options().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+    }
+
+    @Override
+    protected SinkRecord processRecord(CdcRecord record) throws Exception {
+        Map<String, String> fields = record.fields();
+
+        if (!schemaMatched(fields)) {
+            while (true) {
+                table = table.copyWithLatestSchema();
+                if (schemaMatched(fields)) {
+                    break;
+                }
+                Thread.sleep(retrySleepMillis);
+            }
+            write.replace(commitUser -> table.newWrite(commitUser));
+        }
+
+        TableSchema schema = table.schema();
+        GenericRow row = new GenericRow(schema.fields().size());
+        row.setRowKind(record.kind());
+        for (Map.Entry<String, String> field : fields.entrySet()) {
+            String key = field.getKey();
+            String value = field.getValue();
+            int idx = schema.fieldNames().indexOf(key);
+            DataType type = schema.fields().get(idx).type();
+            // TODO TypeUtils.castFromString cannot deal with complex types like arrays and maps.
+            //  Change type of CdcRecord#field if needed.
+            row.setField(idx, TypeUtils.castFromString(value, type));
+        }
+
+        try {
+            return write.write(row);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private boolean schemaMatched(Map<String, String> fields) {
+        TableSchema currentSchema = table.schema();
+        return currentSchema.fieldNames().containsAll(fields.keySet());
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
index b16335d65..e1d671964 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.paimon.table.sink.TableWriteImpl;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -159,5 +161,9 @@ public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
 
         @Override
         public void close() throws Exception {}
+
+        @Override
+        public void replace(Function<String, TableWriteImpl<?>> newWriteProvider)
+                throws Exception {}
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
new file mode 100644
index 000000000..c80196ef5
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SchemaAwareStoreWriteOperator}. */
+public class SchemaAwareStoreWriteOperatorTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING()},
+                    new String[] {"pt", "k", "v"});
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private Path tablePath;
+    private String commitUser;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @AfterEach
+    public void after() {
+        // assert all connections are closed
+        Predicate<Path> pathPredicate = path -> path.toString().contains(tempDir.toString());
+        assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+        assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    @Test
+    @Timeout(30)
+    public void testProcessRecord() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+                createTestHarness(table);
+        harness.open();
+
+        BlockingQueue<CdcRecord> toProcess = new LinkedBlockingQueue<>();
+        BlockingQueue<CdcRecord> processed = new LinkedBlockingQueue<>();
+        AtomicBoolean running = new AtomicBoolean(true);
+        Runnable r =
+                () -> {
+                    long timestamp = 0;
+                    try {
+                        while (running.get()) {
+                            if (toProcess.isEmpty()) {
+                                Thread.sleep(10);
+                                continue;
+                            }
+
+                            CdcRecord record = toProcess.poll();
+                            harness.processElement(record, ++timestamp);
+                            processed.offer(record);
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+
+        Thread t = new Thread(r);
+        t.start();
+
+        // check that records with compatible schema can be processed immediately
+
+        Map<String, String> fields = new HashMap<>();
+        fields.put("pt", "0");
+        fields.put("k", "1");
+        fields.put("v", "10");
+        CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+        toProcess.offer(expected);
+        CdcRecord actual = processed.take();
+        assertThat(actual).isEqualTo(expected);
+
+        fields = new HashMap<>();
+        fields.put("pt", "0");
+        fields.put("k", "2");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        toProcess.offer(expected);
+        actual = processed.take();
+        assertThat(actual).isEqualTo(expected);
+
+        // check that records with new fields should be processed after schema is updated
+
+        fields = new HashMap<>();
+        fields.put("pt", "0");
+        fields.put("k", "3");
+        fields.put("v", "30");
+        fields.put("v2", "300");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        toProcess.offer(expected);
+        actual = processed.poll(1, TimeUnit.SECONDS);
+        assertThat(actual).isNull();
+
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
+        schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.INT()));
+        actual = processed.take();
+        assertThat(actual).isEqualTo(expected);
+
+        running.set(false);
+        t.join();
+        harness.close();
+    }
+
+    private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness(
+            FileStoreTable table) throws Exception {
+        SchemaAwareStoreWriteOperator operator =
+                new SchemaAwareStoreWriteOperator(
+                        table,
+                        null,
+                        (t, context, ioManager) ->
+                                new StoreSinkWriteImpl(t, context, commitUser, ioManager, false));
+        TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
+        TypeSerializer<Committable> outputSerializer =
+                new CommittableTypeInfo().createSerializer(new ExecutionConfig());
+        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator, inputSerializer);
+        harness.setup(outputSerializer);
+        return harness;
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10));
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                ROW_TYPE.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                conf.toMap(),
+                                ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+    }
+}