You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/20 07:31:09 UTC

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6215: [INLONG-6214][Sort] Support multiple sink for IcebergLoadNode

yunqingmoswu commented on code in PR #6215:
URL: https://github.com/apache/inlong/pull/6215#discussion_r1000226942


##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java:
##########
@@ -0,0 +1,227 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.IcebergStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and distribute data into different IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWithSchema, MultipleWriteResult>
+        implements CheckpointedFunction, BoundedOneInput {
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+    private final boolean appendMode;
+    // 可以吧这里的catalogLoad封装在tableLoader中
+    private final CatalogLoader catalogLoader;
+
+    private transient Catalog catalog;
+    // 下面两个的key都是整个db.table的路径
+    private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
+    private transient Map<TableIdentifier, Table> multipleTables;
+    private transient Map<TableIdentifier, Schema> multipleSchemas;
+    private transient FunctionInitializationContext functionInitializationContext;
+
+
+    public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+        this.appendMode = appendMode;
+        this.catalogLoader = catalogLoader;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.catalog = catalogLoader.loadCatalog();
+        this.multipleWriters = new HashMap<>();
+        this.multipleTables = new HashMap<>();
+        this.multipleSchemas = new HashMap<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().endInput();
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().dispose();
+        }
+        multipleWriters.clear();
+        multipleTables.clear();
+        multipleSchemas.clear();
+    }
+
+    @Override
+    public void processElement(RecordWithSchema recordWithSchema) throws Exception {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+
+        if (isSchemaUpdate(recordWithSchema)) {
+            // 在schema变更时就应该中断之前写入的文件,然后新启动一个writer
+            if (multipleTables.get(tableId) == null) {
+                Table table = catalog.loadTable(recordWithSchema.getTableId());
+                multipleTables.put(tableId, table);
+            }
+
+            // refresh some runtime table properties
+            Table table = multipleTables.get(recordWithSchema.getTableId());
+            Map<String, String> tableProperties = table.properties();
+            boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+            long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
+                    WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+            String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+            FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+            List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
+                    .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
+                    .collect(Collectors.toList());
+            // if physical primary key not exist, put all field to logical primary key
+            if (equalityFieldIds.isEmpty()) {
+                equalityFieldIds = recordWithSchema.getSchema().columns().stream()
+                        .map(NestedField::fieldId)
+                        .collect(Collectors.toList());
+            }
+
+            TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+                    table,
+                    recordWithSchema.getSchema(),
+                    FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+                    targetFileSizeBytes,
+                    fileFormat,
+                    equalityFieldIds,
+                    upsertMode,
+                    appendMode);
+
+            if (multipleWriters.get(tableId) == null) {
+                IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
+                        tableId.toString(), taskWriterFactory, null, null);  // todo:后面再考虑metric的兼容
+                writer.setup(getRuntimeContext(),
+                        new CallbackCollector<>(writeResult ->
+                                collector.collect(new MultipleWriteResult(tableId, writeResult))),
+                        context);
+                writer.initializeState(functionInitializationContext);
+                writer.open(new Configuration());
+                multipleWriters.put(tableId, writer);
+            } else {  // only if second times schema will evolute
+                multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+            }
+
+        }
+
+        if (multipleWriters.get(tableId) != null) {
+            for (RowData data : recordWithSchema.getData()) {
+                multipleWriters.get(tableId).processElement(data);
+            }
+        } else {
+            LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().prepareSnapshotPreBarrier(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().snapshotState(context);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.functionInitializationContext = context;
+    }
+
+    private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+        recordWithSchema.replaceSchema();
+        if (multipleSchemas.get(tableId) != null
+                && multipleSchemas.get(tableId).sameSchema(recordWithSchema.getSchema())) {
+            return false;
+        }
+        LOG.info("Schema evolution with table {}, old schema: {}, new Schema: {}",
+                tableId, multipleSchemas.get(tableId), recordWithSchema.getSchema());
+        multipleSchemas.put(recordWithSchema.getTableId(), recordWithSchema.getSchema());

Review Comment:
   Why put it in multipleSchemas here?



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -43,8 +66,39 @@
  */
 public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
 
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =

Review Comment:
   Maybe it is better to call SQL_TYPE_2_ICEBERG_FLINK_TYPE_MAPPING? 



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->

Review Comment:
   Maybe it is better to call 'latestSchema' or 'validSchema'?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();
+
+        JsonNode jsonNode = objectMapper.readTree(wholeData);
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode);
+        } else {
+            execDML(jsonNode);
+        }
+    }
+
+    private void execDDL(JsonNode jsonNode) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode) throws IOException {
+        RecordWithSchema record = parseRecord(jsonNode);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+
+    // ================================ 所有的与coordinator交互的request和response方法 ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema currentSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(currentSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll();
+                output.collect(new StreamRecord<>(
+                        recordWithSchema
+                                .refreshFieldId(currentSchema)
+                                .refreshRowData((jsonNode, schema1) ->
+                                     dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1))
+                                )));
+            } else {
+                handldAlterSchemaEventFromOperator(tableId, currentSchema, dataSchema);
+            }
+        }
+    }
+
+    // ================================ 所有的coordinator处理的方法 ==============================================
+    private void handleTableCreateEventFromOperator(TableIdentifier tableId, Schema schema) {
+        if (!catalog.tableExists(tableId)) {
+            if (asNamespaceCatalog != null && !asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+                try {
+                    asNamespaceCatalog.createNamespace(tableId.namespace());
+                    LOG.info("Auto create Database({}) in Catalog({}).", tableId.namespace(), catalog.name());
+                } catch (AlreadyExistsException e) {
+                    LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name());
+                }
+            }
+
+            ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+            properties.put("format-version", "2"); // todo:后续考虑默认参数给哪些,并且将这个默认参数暴露在表参数上
+            properties.put("write.upsert.enabled", "true");
+            // 设置了这个属性自动建表后hive才能查询到
+            properties.put("engine.hive.enabled", "true");
+
+            try {
+                catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build());
+                LOG.info("Auto create Table({}) in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            } catch (AlreadyExistsException e) {
+                LOG.warn("Table({}) already exist in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            }
+        }
+
+        handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
+    }
+
+    private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) {
+        Table table = catalog.loadTable(tableId);
+
+        // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the
+        // table.
+        // Judging whether changes can be made by schema comparison (currently only column additions are supported),
+        // for scenarios that cannot be changed, it is always considered that there is a problem with the data.
+        Transaction transaction = table.newTransaction();
+        if (table.schema().sameSchema(oldSchema)) {
+            List<TableChange> tableChanges = TableChange.diffSchema(oldSchema, newSchema);
+            TableChange.applySchemaChanges(transaction.updateSchema(), tableChanges);
+            LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+        }
+        transaction.commitTransaction();
+        handleSchemaInfoEvent(tableId, table.schema());
+    }
+
+    // =============================== 工具方法 =================================================================
+    // The way to judge compatibility is whether all the field names in the old schema exist in the new schema
+    private boolean isCompatible(Schema newSchema, Schema oldSchema) {
+        for (NestedField field : oldSchema.columns()) {
+            if (newSchema.findField(field.name()) == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // 从数据中解析schema信息并转换成为flink内置的schema,对不同的格式(canal-json、ogg)以插件接口的方式提供这个转换方式
+    private RecordWithSchema parseRecord(JsonNode data) throws IOException {
+        String databaseStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getDatabasePattern());
+        String tableStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getTablePattern());
+        List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data);
+        RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);

Review Comment:
   Compatibility processing for iceberg-specific data types based schema or add a callback function in the 'dynamicSchemaFormat.extractSchema'?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        String wholeData = element.getValue().getString(0).toString();

Review Comment:
   May be it can get the data by 'dynamicSchemaFormat.deserialize(rowData.getBinary(0))'?



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##########
@@ -41,6 +41,7 @@
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;

Review Comment:
   Why add this line?



##########
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/WholeDatabaseMigrationOperator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>

Review Comment:
   Maybe it is better to rename 'WholeDatabaseMigrationOperator' to 'MultipleSinkOperator'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org