You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "zhuangchong (via GitHub)" <gi...@apache.org> on 2023/11/17 11:27:55 UTC

[PR] [cdc] Added pulsar database cdc synchronization action and database synchronization base class [incubator-paimon]

zhuangchong opened a new pull request, #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Added pulsar database cdc synchronization action and database synchronization base class .
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "zhuangchong (via GitHub)" <gi...@apache.org>.
zhuangchong commented on PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#issuecomment-1832952229

   > Thanks for contributing. Left some comments.
   
   Thanks @yuzelin. done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1408865880


##########
docs/layouts/shortcodes/generated/pulsar_sync_database.html:
##########
@@ -0,0 +1,77 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--schema-init-max-read</h5></td>
+        <td>If your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized. The default value is 1000.</td>

Review Comment:
   This option is no longer supported. Delete this and delete the same option in kafka cdc.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java:
##########
@@ -324,8 +324,7 @@ static DataFormat getDataFormat(Configuration pulsarConfig) {
     /** Referenced to {@link PulsarPartitionSplitReader#createPulsarConsumer}. */
     static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
             Configuration pulsarConfig, String topic) throws PulsarClientException {
-        SourceConfiguration pulsarSourceConfiguration =
-                toSourceConfiguration(preprocessPulsarConfig(pulsarConfig));

Review Comment:
   Why modify this?



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java:
##########
@@ -214,17 +211,6 @@ public void build() throws Exception {
         sinkBuilder.build();
     }
 
-    protected DataStreamSource<String> buildDataStreamSource(Object source) {
-        if (source instanceof Source) {
-            return env.fromSource(
-                    (Source<String, ?, ?>) source, WatermarkStrategy.noWatermarks(), sourceName());
-        }
-        if (source instanceof SourceFunction) {
-            return env.addSource((SourceFunction<String>) source, sourceName());
-        }
-        throw new UnsupportedOperationException("Unrecognized source type");
-    }
-

Review Comment:
   Why not keep this method?



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncDatabaseActionBase.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParser;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link Action} which synchronize the Multiple message queue topics into one Paimon database.
+ *
+ * <p>For each message queue topic's table to be synchronized, if the corresponding Paimon table
+ * does not exist, this action will automatically create the table, and its schema will be derived
+ * from all specified message queue topic's tables. If the Paimon table already exists and its
+ * schema is different from that parsed from message queue record, this action will try to preform
+ * schema evolution.
+ *
+ * <p>This action supports a limited number of schema changes. Currently, the framework can not drop
+ * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
+ * supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ *   <li>Altering column types. More specifically,
+ *       <ul>
+ *         <li>altering from a string type (char, varchar, text) to another string type with longer
+ *             length,
+ *         <li>altering from a binary type (binary, varbinary, blob) to another binary type with
+ *             longer length,
+ *         <li>altering from an integer type (tinyint, smallint, int, bigint) to another integer
+ *             type with wider range,
+ *         <li>altering from a floating-point type (float, double) to another floating-point type
+ *             with wider range,
+ *       </ul>
+ *       are supported.
+ * </ul>
+ *
+ * <p>To automatically synchronize new table, This action creates a single sink for all Paimon
+ * tables to be written. See {@link MultiTablesSinkMode#COMBINED}.
+ */
+public abstract class MessageQueueSyncDatabaseActionBase extends ActionBase {
+
+    protected final String database;
+    protected final Configuration cdcSourceConfig;
+
+    private Map<String, String> tableConfig = new HashMap<>();
+    private String tablePrefix = "";
+    private String tableSuffix = "";
+    private String includingTables = ".*";
+    @Nullable String excludingTables;
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
+
+    public MessageQueueSyncDatabaseActionBase(
+            String warehouse,
+            String database,
+            Map<String, String> catalogConfig,
+            Map<String, String> mqConfig) {
+        super(warehouse, catalogConfig);
+        this.database = database;
+        this.cdcSourceConfig = Configuration.fromMap(mqConfig);
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTableConfig(Map<String, String> tableConfig) {
+        this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
+        if (tablePrefix != null) {
+            this.tablePrefix = tablePrefix;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
+        if (tableSuffix != null) {
+            this.tableSuffix = tableSuffix;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase includingTables(@Nullable String includingTables) {
+        if (includingTables != null) {
+            this.includingTables = includingTables;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
+    }
+
+    protected abstract DataStreamSource<String> buildSource() throws Exception;
+
+    protected abstract String sourceName();
+
+    protected abstract DataFormat getDataFormat();
+
+    protected abstract String jobName();
+
+    @Override
+    public void build() throws Exception {
+        boolean caseSensitive = catalog.caseSensitive();
+
+        validateCaseInsensitive(caseSensitive);
+
+        catalog.createDatabase(database, true);
+
+        DataFormat format = getDataFormat();
+        RecordParser recordParser =
+                format.createParser(caseSensitive, typeMapping, Collections.emptyList());
+        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
+        Pattern includingPattern = Pattern.compile(includingTables);
+        Pattern excludingPattern =
+                excludingTables == null ? null : Pattern.compile(excludingTables);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
+                () ->
+                        new RichCdcMultiplexRecordEventParser(
+                                schemaBuilder,
+                                includingPattern,
+                                excludingPattern,
+                                tableNameConverter);
+
+        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+                .withInput(buildSource().flatMap(recordParser).name("Parse"))
+                .withParserFactory(parserFactory)
+                .withCatalogLoader(catalogLoader())
+                .withDatabase(database)
+                .withMode(MultiTablesSinkMode.COMBINED)
+                .withTableOptions(tableConfig)
+                .build();
+    }
+
+    private void validateCaseInsensitive(boolean caseSensitive) {
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
+    }
+
+    @VisibleForTesting
+    public Map<String, String> tableConfig() {
+        return tableConfig;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void run() throws Exception {
+        build();
+        execute(String.format("KAFKA-Paimon Database Sync: %s", database));

Review Comment:
   `execute(jobName());`



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.flink.action.cdc.MessageQueueSyncDatabaseActionBase;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import java.util.Map;
+
+/** Synchronize database from Pulsar. */
+public class PulsarSyncDatabaseAction extends MessageQueueSyncDatabaseActionBase {
+
+    public PulsarSyncDatabaseAction(
+            String warehouse,
+            String database,
+            Map<String, String> catalogConfig,
+            Map<String, String> kafkaConfig) {

Review Comment:
   Rename `kafkaConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "zhuangchong (via GitHub)" <gi...@apache.org>.
zhuangchong commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1410148011


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java:
##########
@@ -214,17 +211,6 @@ public void build() throws Exception {
         sinkBuilder.build();
     }
 
-    protected DataStreamSource<String> buildDataStreamSource(Object source) {
-        if (source instanceof Source) {
-            return env.fromSource(
-                    (Source<String, ?, ?>) source, WatermarkStrategy.noWatermarks(), sourceName());
-        }
-        if (source instanceof SourceFunction) {
-            return env.addSource((SourceFunction<String>) source, sourceName());
-        }
-        throw new UnsupportedOperationException("Unrecognized source type");
-    }
-

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "zhuangchong (via GitHub)" <gi...@apache.org>.
zhuangchong commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1410042056


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java:
##########
@@ -324,8 +324,7 @@ static DataFormat getDataFormat(Configuration pulsarConfig) {
     /** Referenced to {@link PulsarPartitionSplitReader#createPulsarConsumer}. */
     static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
             Configuration pulsarConfig, String topic) throws PulsarClientException {
-        SourceConfiguration pulsarSourceConfiguration =
-                toSourceConfiguration(preprocessPulsarConfig(pulsarConfig));

Review Comment:
   I found that the `preprocessPulsarConfig` method will be executed first in the `toSourceConfiguration` method, so preprocessPulsarConfig is deleted here.
   
   ```java
       static SourceConfiguration toSourceConfiguration(Configuration rawConfig) {
           return new SourceConfiguration(preprocessPulsarConfig(rawConfig));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin merged PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "zhuangchong (via GitHub)" <gi...@apache.org>.
zhuangchong commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1410042331


##########
docs/layouts/shortcodes/generated/pulsar_sync_database.html:
##########
@@ -0,0 +1,77 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--schema-init-max-read</h5></td>
+        <td>If your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized. The default value is 1000.</td>

Review Comment:
   done.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.flink.action.cdc.MessageQueueSyncDatabaseActionBase;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import java.util.Map;
+
+/** Synchronize database from Pulsar. */
+public class PulsarSyncDatabaseAction extends MessageQueueSyncDatabaseActionBase {
+
+    public PulsarSyncDatabaseAction(
+            String warehouse,
+            String database,
+            Map<String, String> catalogConfig,
+            Map<String, String> kafkaConfig) {

Review Comment:
   done.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncDatabaseActionBase.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParser;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link Action} which synchronize the Multiple message queue topics into one Paimon database.
+ *
+ * <p>For each message queue topic's table to be synchronized, if the corresponding Paimon table
+ * does not exist, this action will automatically create the table, and its schema will be derived
+ * from all specified message queue topic's tables. If the Paimon table already exists and its
+ * schema is different from that parsed from message queue record, this action will try to preform
+ * schema evolution.
+ *
+ * <p>This action supports a limited number of schema changes. Currently, the framework can not drop
+ * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
+ * supported schema changes includes:
+ *
+ * <ul>
+ *   <li>Adding columns.
+ *   <li>Altering column types. More specifically,
+ *       <ul>
+ *         <li>altering from a string type (char, varchar, text) to another string type with longer
+ *             length,
+ *         <li>altering from a binary type (binary, varbinary, blob) to another binary type with
+ *             longer length,
+ *         <li>altering from an integer type (tinyint, smallint, int, bigint) to another integer
+ *             type with wider range,
+ *         <li>altering from a floating-point type (float, double) to another floating-point type
+ *             with wider range,
+ *       </ul>
+ *       are supported.
+ * </ul>
+ *
+ * <p>To automatically synchronize new table, This action creates a single sink for all Paimon
+ * tables to be written. See {@link MultiTablesSinkMode#COMBINED}.
+ */
+public abstract class MessageQueueSyncDatabaseActionBase extends ActionBase {
+
+    protected final String database;
+    protected final Configuration cdcSourceConfig;
+
+    private Map<String, String> tableConfig = new HashMap<>();
+    private String tablePrefix = "";
+    private String tableSuffix = "";
+    private String includingTables = ".*";
+    @Nullable String excludingTables;
+    private TypeMapping typeMapping = TypeMapping.defaultMapping();
+
+    public MessageQueueSyncDatabaseActionBase(
+            String warehouse,
+            String database,
+            Map<String, String> catalogConfig,
+            Map<String, String> mqConfig) {
+        super(warehouse, catalogConfig);
+        this.database = database;
+        this.cdcSourceConfig = Configuration.fromMap(mqConfig);
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTableConfig(Map<String, String> tableConfig) {
+        this.tableConfig = tableConfig;
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
+        if (tablePrefix != null) {
+            this.tablePrefix = tablePrefix;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
+        if (tableSuffix != null) {
+            this.tableSuffix = tableSuffix;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase includingTables(@Nullable String includingTables) {
+        if (includingTables != null) {
+            this.includingTables = includingTables;
+        }
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
+        this.excludingTables = excludingTables;
+        return this;
+    }
+
+    public MessageQueueSyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) {
+        this.typeMapping = typeMapping;
+        return this;
+    }
+
+    protected abstract DataStreamSource<String> buildSource() throws Exception;
+
+    protected abstract String sourceName();
+
+    protected abstract DataFormat getDataFormat();
+
+    protected abstract String jobName();
+
+    @Override
+    public void build() throws Exception {
+        boolean caseSensitive = catalog.caseSensitive();
+
+        validateCaseInsensitive(caseSensitive);
+
+        catalog.createDatabase(database, true);
+
+        DataFormat format = getDataFormat();
+        RecordParser recordParser =
+                format.createParser(caseSensitive, typeMapping, Collections.emptyList());
+        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
+        Pattern includingPattern = Pattern.compile(includingTables);
+        Pattern excludingPattern =
+                excludingTables == null ? null : Pattern.compile(excludingTables);
+        TableNameConverter tableNameConverter =
+                new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
+                () ->
+                        new RichCdcMultiplexRecordEventParser(
+                                schemaBuilder,
+                                includingPattern,
+                                excludingPattern,
+                                tableNameConverter);
+
+        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
+                .withInput(buildSource().flatMap(recordParser).name("Parse"))
+                .withParserFactory(parserFactory)
+                .withCatalogLoader(catalogLoader())
+                .withDatabase(database)
+                .withMode(MultiTablesSinkMode.COMBINED)
+                .withTableOptions(tableConfig)
+                .build();
+    }
+
+    private void validateCaseInsensitive(boolean caseSensitive) {
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
+        AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
+    }
+
+    @VisibleForTesting
+    public Map<String, String> tableConfig() {
+        return tableConfig;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void run() throws Exception {
+        build();
+        execute(String.format("KAFKA-Paimon Database Sync: %s", database));

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "zhuangchong (via GitHub)" <gi...@apache.org>.
zhuangchong commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1410045607


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java:
##########
@@ -214,17 +211,6 @@ public void build() throws Exception {
         sinkBuilder.build();
     }
 
-    protected DataStreamSource<String> buildDataStreamSource(Object source) {
-        if (source instanceof Source) {
-            return env.fromSource(
-                    (Source<String, ?, ?>) source, WatermarkStrategy.noWatermarks(), sourceName());
-        }
-        if (source instanceof SourceFunction) {
-            return env.addSource((SourceFunction<String>) source, sourceName());
-        }
-        throw new UnsupportedOperationException("Unrecognized source type");
-    }
-

Review Comment:
   I think this part of the code is best written in each implementation class. When a table action is added later, env.addSource() or env.fromSource() or others can be used directly and explicitly.
   
   https://github.com/apache/incubator-paimon/blob/1571f8f33ff74bcc9a8a0b6301da349c1780dae2/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java#L119-L125



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [cdc] Added pulsar database cdc synchronization action [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2345:
URL: https://github.com/apache/incubator-paimon/pull/2345#discussion_r1410117155


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java:
##########
@@ -214,17 +211,6 @@ public void build() throws Exception {
         sinkBuilder.build();
     }
 
-    protected DataStreamSource<String> buildDataStreamSource(Object source) {
-        if (source instanceof Source) {
-            return env.fromSource(
-                    (Source<String, ?, ?>) source, WatermarkStrategy.noWatermarks(), sourceName());
-        }
-        if (source instanceof SourceFunction) {
-            return env.addSource((SourceFunction<String>) source, sourceName());
-        }
-        throw new UnsupportedOperationException("Unrecognized source type");
-    }
-

Review Comment:
   I think we can keep this method. We can modify it in the future if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org