You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/19 01:11:35 UTC

[GitHub] [inlong] thesumery opened a new pull request, #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

thesumery opened a new pull request, #6215:
URL: https://github.com/apache/inlong/pull/6215

   ### Prepare a Pull Request
   -  [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode
   - Fixes #6214 
   
   ### Motivation
   
   *Support multiple sink for IcebergLoadNode*
   
   ### Modifications
   
   *Support multiple sink for IcebergLoadNode*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003192882


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java:
##########
@@ -0,0 +1,227 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.IcebergStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and distribute data into different IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWithSchema, MultipleWriteResult>
+        implements CheckpointedFunction, BoundedOneInput {
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+    private final boolean appendMode;
+    // 可以吧这里的catalogLoad封装在tableLoader中
+    private final CatalogLoader catalogLoader;
+
+    private transient Catalog catalog;
+    // 下面两个的key都是整个db.table的路径
+    private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
+    private transient Map<TableIdentifier, Table> multipleTables;
+    private transient Map<TableIdentifier, Schema> multipleSchemas;
+    private transient FunctionInitializationContext functionInitializationContext;
+
+
+    public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+        this.appendMode = appendMode;
+        this.catalogLoader = catalogLoader;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.catalog = catalogLoader.loadCatalog();
+        this.multipleWriters = new HashMap<>();
+        this.multipleTables = new HashMap<>();
+        this.multipleSchemas = new HashMap<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().endInput();
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().dispose();
+        }
+        multipleWriters.clear();
+        multipleTables.clear();
+        multipleSchemas.clear();
+    }
+
+    @Override
+    public void processElement(RecordWithSchema recordWithSchema) throws Exception {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+
+        if (isSchemaUpdate(recordWithSchema)) {
+            // 在schema变更时就应该中断之前写入的文件,然后新启动一个writer
+            if (multipleTables.get(tableId) == null) {
+                Table table = catalog.loadTable(recordWithSchema.getTableId());
+                multipleTables.put(tableId, table);
+            }
+
+            // refresh some runtime table properties
+            Table table = multipleTables.get(recordWithSchema.getTableId());
+            Map<String, String> tableProperties = table.properties();
+            boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+            long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
+                    WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+            String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+            FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+            List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
+                    .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
+                    .collect(Collectors.toList());
+            // if physical primary key not exist, put all field to logical primary key
+            if (equalityFieldIds.isEmpty()) {
+                equalityFieldIds = recordWithSchema.getSchema().columns().stream()
+                        .map(NestedField::fieldId)
+                        .collect(Collectors.toList());
+            }
+
+            TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+                    table,
+                    recordWithSchema.getSchema(),
+                    FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+                    targetFileSizeBytes,
+                    fileFormat,
+                    equalityFieldIds,
+                    upsertMode,
+                    appendMode);
+
+            if (multipleWriters.get(tableId) == null) {
+                IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
+                        tableId.toString(), taskWriterFactory, null, null);  // todo:后面再考虑metric的兼容
+                writer.setup(getRuntimeContext(),
+                        new CallbackCollector<>(writeResult ->
+                                collector.collect(new MultipleWriteResult(tableId, writeResult))),
+                        context);
+                writer.initializeState(functionInitializationContext);
+                writer.open(new Configuration());
+                multipleWriters.put(tableId, writer);
+            } else {  // only if second times schema will evolute
+                multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+            }
+
+        }
+
+        if (multipleWriters.get(tableId) != null) {
+            for (RowData data : recordWithSchema.getData()) {
+                multipleWriters.get(tableId).processElement(data);
+            }
+        } else {
+            LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().prepareSnapshotPreBarrier(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().snapshotState(context);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.functionInitializationContext = context;
+    }
+
+    private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+        recordWithSchema.replaceSchema();
+        if (multipleSchemas.get(tableId) != null
+                && multipleSchemas.get(tableId).sameSchema(recordWithSchema.getSchema())) {
+            return false;
+        }
+        LOG.info("Schema evolution with table {}, old schema: {}, new Schema: {}",
+                tableId, multipleSchemas.get(tableId), recordWithSchema.getSchema());
+        multipleSchemas.put(recordWithSchema.getTableId(), recordWithSchema.getSchema());

Review Comment:
   put schema in cache because every time data come into, it should know scheam update or not. If updated ,the writer will be new, and old wrtier shoule close , so it shoule cache schema to compare data schema and previous schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003959521


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##########
@@ -0,0 +1,137 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
+import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SchemaChangeUtils {
+    private static final Joiner DOT = Joiner.on(".");
+
+    /**
+     * Compare two schemas and get the schema changes that happened in them.
+     * TODO: currently only support add column
+     *
+     * @param oldSchema
+     * @param newSchema
+     * @return
+     */
+    static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
+        List<String> oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        List<String> newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        int oi = 0;
+        int ni = 0;
+        List<TableChange> tableChanges = new ArrayList<>();
+        while (ni < newFields.size()) {
+            if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) {
+                oi++;
+                ni++;
+                continue;

Review Comment:
   useless continue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000720450


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -43,8 +66,39 @@
  */
 public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
 
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =

Review Comment:
   Here name is error, it's not business with iceberg. It shoule be called SQL_TYPE_2_FLINK_TYPE_MAPPING



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003201417


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();

Review Comment:
   change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003207476


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> JOB_ID_DESCRIPTOR;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.JOB_ID_DESCRIPTOR= new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.STATE_DESCRIPTOR = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003959924


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java:
##########
@@ -0,0 +1,137 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
+import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SchemaChangeUtils {
+    private static final Joiner DOT = Joiner.on(".");
+
+    /**
+     * Compare two schemas and get the schema changes that happened in them.
+     * TODO: currently only support add column
+     *
+     * @param oldSchema
+     * @param newSchema
+     * @return
+     */
+    static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
+        List<String> oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        List<String> newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        int oi = 0;
+        int ni = 0;
+        List<TableChange> tableChanges = new ArrayList<>();
+        while (ni < newFields.size()) {
+            if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) {
+                oi++;
+                ni++;
+                continue;
+            } else {
+                NestedField newField = newSchema.findField(newFields.get(ni));
+                tableChanges.add(
+                        new AddColumn(
+                                new String[]{newField.name()},
+                                FlinkSchemaUtil.convert(newField.type()),
+                                !newField.isRequired(),
+                                newField.doc(),
+                                ni == 0 ? ColumnPosition.first() : ColumnPosition.after(newFields.get(ni - 1))));
+                ni++;
+            }
+        }
+
+        if (oi != oldFields.size()) {
+            tableChanges.clear();
+            tableChanges.add(
+                    new UnknownColumnChange(
+                            String.format("Unspported schema update.\n"

Review Comment:
   unsupported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000226942


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java:
##########
@@ -0,0 +1,227 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.IcebergStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and distribute data into different IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWithSchema, MultipleWriteResult>
+        implements CheckpointedFunction, BoundedOneInput {
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+    private final boolean appendMode;
+    // 可以吧这里的catalogLoad封装在tableLoader中
+    private final CatalogLoader catalogLoader;
+
+    private transient Catalog catalog;
+    // 下面两个的key都是整个db.table的路径
+    private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
+    private transient Map<TableIdentifier, Table> multipleTables;
+    private transient Map<TableIdentifier, Schema> multipleSchemas;
+    private transient FunctionInitializationContext functionInitializationContext;
+
+
+    public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+        this.appendMode = appendMode;
+        this.catalogLoader = catalogLoader;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.catalog = catalogLoader.loadCatalog();
+        this.multipleWriters = new HashMap<>();
+        this.multipleTables = new HashMap<>();
+        this.multipleSchemas = new HashMap<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().endInput();
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().dispose();
+        }
+        multipleWriters.clear();
+        multipleTables.clear();
+        multipleSchemas.clear();
+    }
+
+    @Override
+    public void processElement(RecordWithSchema recordWithSchema) throws Exception {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+
+        if (isSchemaUpdate(recordWithSchema)) {
+            // 在schema变更时就应该中断之前写入的文件,然后新启动一个writer
+            if (multipleTables.get(tableId) == null) {
+                Table table = catalog.loadTable(recordWithSchema.getTableId());
+                multipleTables.put(tableId, table);
+            }
+
+            // refresh some runtime table properties
+            Table table = multipleTables.get(recordWithSchema.getTableId());
+            Map<String, String> tableProperties = table.properties();
+            boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+            long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
+                    WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+            String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+            FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+            List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
+                    .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
+                    .collect(Collectors.toList());
+            // if physical primary key not exist, put all field to logical primary key
+            if (equalityFieldIds.isEmpty()) {
+                equalityFieldIds = recordWithSchema.getSchema().columns().stream()
+                        .map(NestedField::fieldId)
+                        .collect(Collectors.toList());
+            }
+
+            TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+                    table,
+                    recordWithSchema.getSchema(),
+                    FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+                    targetFileSizeBytes,
+                    fileFormat,
+                    equalityFieldIds,
+                    upsertMode,
+                    appendMode);
+
+            if (multipleWriters.get(tableId) == null) {
+                IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
+                        tableId.toString(), taskWriterFactory, null, null);  // todo:后面再考虑metric的兼容
+                writer.setup(getRuntimeContext(),
+                        new CallbackCollector<>(writeResult ->
+                                collector.collect(new MultipleWriteResult(tableId, writeResult))),
+                        context);
+                writer.initializeState(functionInitializationContext);
+                writer.open(new Configuration());
+                multipleWriters.put(tableId, writer);
+            } else {  // only if second times schema will evolute
+                multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+            }
+
+        }
+
+        if (multipleWriters.get(tableId) != null) {
+            for (RowData data : recordWithSchema.getData()) {
+                multipleWriters.get(tableId).processElement(data);
+            }
+        } else {
+            LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().prepareSnapshotPreBarrier(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().snapshotState(context);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.functionInitializationContext = context;
+    }
+
+    private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+        recordWithSchema.replaceSchema();
+        if (multipleSchemas.get(tableId) != null
+                && multipleSchemas.get(tableId).sameSchema(recordWithSchema.getSchema())) {
+            return false;
+        }
+        LOG.info("Schema evolution with table {}, old schema: {}, new Schema: {}",
+                tableId, multipleSchemas.get(tableId), recordWithSchema.getSchema());
+        multipleSchemas.put(recordWithSchema.getTableId(), recordWithSchema.getSchema());

Review Comment:
   Why put it in multipleSchemas here?



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -43,8 +66,39 @@
  */
 public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
 
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =

Review Comment:
   Maybe it is better to call SQL_TYPE_2_ICEBERG_FLINK_TYPE_MAPPING? 



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->

Review Comment:
   Maybe it is better to call 'latestSchema' or 'validSchema'?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->
+                                     dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1))
+                                )));
+            } else {
+                handldAlterSchemaEventFromOperator(tableId, currentSchema, dataSchema);
+            }
+        }
+    }
+
+    // ================================ 所有的coordinator处理的方法 ==============================================
+    private void handleTableCreateEventFromOperator(TableIdentifier tableId, Schema schema) {
+        if (!catalog.tableExists(tableId)) {
+            if (asNamespaceCatalog != null && !asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+                try {
+                    asNamespaceCatalog.createNamespace(tableId.namespace());
+                    LOG.info("Auto create Database({}) in Catalog({}).", tableId.namespace(), catalog.name());
+                } catch (AlreadyExistsException e) {
+                    LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name());
+                }
+            }
+
+            ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+            properties.put("format-version", "2"); // todo:后续考虑默认参数给哪些,并且将这个默认参数暴露在表参数上
+            properties.put("write.upsert.enabled", "true");
+            // 设置了这个属性自动建表后hive才能查询到
+            properties.put("engine.hive.enabled", "true");
+
+            try {
+                catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build());
+                LOG.info("Auto create Table({}) in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            } catch (AlreadyExistsException e) {
+                LOG.warn("Table({}) already exist in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            }
+        }
+
+        handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
+    }
+
+    private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) {
+        Table table = catalog.loadTable(tableId);
+
+        // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the
+        // table.
+        // Judging whether changes can be made by schema comparison (currently only column additions are supported),
+        // for scenarios that cannot be changed, it is always considered that there is a problem with the data.
+        Transaction transaction = table.newTransaction();
+        if (table.schema().sameSchema(oldSchema)) {
+            List<TableChange> tableChanges = TableChange.diffSchema(oldSchema, newSchema);
+            TableChange.applySchemaChanges(transaction.updateSchema(), tableChanges);
+            LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+        }
+        transaction.commitTransaction();
+        handleSchemaInfoEvent(tableId, table.schema());
+    }
+
+    // =============================== 工具方法 =================================================================
+    // The way to judge compatibility is whether all the field names in the old schema exist in the new schema
+    private boolean isCompatible(Schema newSchema, Schema oldSchema) {
+        for (NestedField field : oldSchema.columns()) {
+            if (newSchema.findField(field.name()) == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // 从数据中解析schema信息并转换成为flink内置的schema,对不同的格式(canal-json、ogg)以插件接口的方式提供这个转换方式
+    private RecordWithSchema parseRecord(JsonNode data) throws IOException {
+        String databaseStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getDatabasePattern());
+        String tableStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getTablePattern());
+        List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data);
+        RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);

Review Comment:
   Compatibility processing for iceberg-specific data types based schema or add a callback function in the 'dynamicSchemaFormat.extractSchema'?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();

Review Comment:
   May be it can get the data by 'dynamicSchemaFormat.deserialize(rowData.getBinary(0))'?



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##########
@@ -41,6 +41,7 @@
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;

Review Comment:
   Why add this line?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>

Review Comment:
   Maybe it is better to rename 'WholeDatabaseMigrationOperator' to 'MultipleSinkOperator'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003202096


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->
+                                     dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1))
+                                )));
+            } else {
+                handldAlterSchemaEventFromOperator(tableId, currentSchema, dataSchema);
+            }
+        }
+    }
+
+    // ================================ 所有的coordinator处理的方法 ==============================================
+    private void handleTableCreateEventFromOperator(TableIdentifier tableId, Schema schema) {
+        if (!catalog.tableExists(tableId)) {
+            if (asNamespaceCatalog != null && !asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+                try {
+                    asNamespaceCatalog.createNamespace(tableId.namespace());
+                    LOG.info("Auto create Database({}) in Catalog({}).", tableId.namespace(), catalog.name());
+                } catch (AlreadyExistsException e) {
+                    LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name());
+                }
+            }
+
+            ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+            properties.put("format-version", "2"); // todo:后续考虑默认参数给哪些,并且将这个默认参数暴露在表参数上
+            properties.put("write.upsert.enabled", "true");
+            // 设置了这个属性自动建表后hive才能查询到
+            properties.put("engine.hive.enabled", "true");
+
+            try {
+                catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build());
+                LOG.info("Auto create Table({}) in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            } catch (AlreadyExistsException e) {
+                LOG.warn("Table({}) already exist in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            }
+        }
+
+        handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
+    }
+
+    private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) {
+        Table table = catalog.loadTable(tableId);
+
+        // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the
+        // table.
+        // Judging whether changes can be made by schema comparison (currently only column additions are supported),
+        // for scenarios that cannot be changed, it is always considered that there is a problem with the data.
+        Transaction transaction = table.newTransaction();
+        if (table.schema().sameSchema(oldSchema)) {
+            List<TableChange> tableChanges = TableChange.diffSchema(oldSchema, newSchema);
+            TableChange.applySchemaChanges(transaction.updateSchema(), tableChanges);
+            LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+        }
+        transaction.commitTransaction();
+        handleSchemaInfoEvent(tableId, table.schema());
+    }
+
+    // =============================== 工具方法 =================================================================
+    // The way to judge compatibility is whether all the field names in the old schema exist in the new schema
+    private boolean isCompatible(Schema newSchema, Schema oldSchema) {
+        for (NestedField field : oldSchema.columns()) {
+            if (newSchema.findField(field.name()) == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // 从数据中解析schema信息并转换成为flink内置的schema,对不同的格式(canal-json、ogg)以插件接口的方式提供这个转换方式
+    private RecordWithSchema parseRecord(JsonNode data) throws IOException {
+        String databaseStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getDatabasePattern());
+        String tableStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getTablePattern());
+        List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data);
+        RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);

Review Comment:
   already process it after scheam change 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1001429267


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->

Review Comment:
   ok, it is better than 'currentSchema'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #6215: [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap merged PR #6215:
URL: https://github.com/apache/inlong/pull/6215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003201193


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>

Review Comment:
   'MultipleSinkOperator' maybe not good to understand, maybe 'DynamicSchemaHandlerOperator'?



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##########
@@ -41,6 +41,7 @@
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;

Review Comment:
   delete it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1003953224


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000656733


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> JOB_ID_DESCRIPTOR;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.JOB_ID_DESCRIPTOR= new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.STATE_DESCRIPTOR = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);

Review Comment:
   large chunk of duplicate code in IcebergFilesCommitter, any chance to abstract this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000718497


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> JOB_ID_DESCRIPTOR;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.JOB_ID_DESCRIPTOR= new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.STATE_DESCRIPTOR = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);

Review Comment:
   IcebergSingleFileCommiter and IcebergSingleStrmeaWriter infact is duplicate code, i will later abstact them for single iceberg sink once verfied.



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> JOB_ID_DESCRIPTOR;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.JOB_ID_DESCRIPTOR= new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.STATE_DESCRIPTOR = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);

Review Comment:
   `IcebergSingleFileCommiter` and `IcebergSingleStrmeaWriter` infact is duplicate code, i will later abstact them for single iceberg sink once verfied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org