You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/10 08:21:34 UTC

[GitHub] [inlong] yunqingmoswu opened a new pull request, #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

yunqingmoswu opened a new pull request, #6123:
URL: https://github.com/apache/inlong/pull/6123

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   Title: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
   
   *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)*
   
   Fixes #6116 
   
   ### Motivation
   
   Support dynamic topic for KafkaLoadNode when the format of kafka is raw and the 'key.fields' is not specifyed.
   This is mainly for some whole database migration scenarios, we assume that the upstream input data is a mixed schema of whole database migration, we ignore the real schema for now, receive the entire record in a binary raw data format, and fetch and parse its schema and data on the kafka sink side, and according to Some data values ​​are dynamically written to related topics.
   Dynamic topic writing has some limitations:
   1.The upstream data is raw format with a fixed inner format, only support [canal-json|debezium-json] at now
   2.The 'key.fields' is not specifyed
   3.It needs to specify 'topic-pattern' and 'inner.format' for dynamically extracting topic from data
   
   ### Modifications
   
   1.Add dynamic schema format
   2.Add dynamic topic support for DynamicKafkaSerializationSchema
   3.Add dynamic topic support for KafkaLoadNode
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991207109


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Debezium json dynamic format
+ */
+public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
+
+    private static final String IDENTIFIER = "debezium-json";
+
+    private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
+
+    private DebeziumJsonDynamicSchemaFormat() {
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static AbstractDynamicSchemaFormat getInstance() {
+        return FORMAT;
+    }
+
+    @Override
+    protected JsonNode getPhysicalData(JsonNode root) {
+        return root.get("after");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991911525


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    @Override
+    public List<String> extract(byte[] message, String... keys) throws IOException {
+        if (keys == null || keys.length == 0) {
+            return new ArrayList<>();
+        }
+        final JsonNode root = deserialize(message);
+        JsonNode physicalNode = getPhysicalData(root);
+        List<String> values = new ArrayList<>(keys.length);
+        if (physicalNode == null) {
+            for (String key : keys) {
+                values.add(extract(root, key));
+            }
+            return values;
+        }
+        for (String key : keys) {
+            String value = extract(physicalNode, key);
+            if (value == null) {
+                value = extract(root, key);
+            }
+            values.add(value);
+        }
+        return values;
+    }
+
+    /**
+     * Extract value by key from ${@link JsonNode}
+     *
+     * @param jsonNode The json node
+     * @param key The key that will be used to extract
+     * @return The value maps the key in the json node
+     */
+    @Override
+    public String extract(JsonNode jsonNode, String key) {
+        if (jsonNode == null || key == null) {
+            return null;
+        }
+        JsonNode value = jsonNode.get(key);
+        if (value != null) {
+            return value.asText();
+        }
+        int index = key.indexOf(".");
+        if (index > 0 && index + 1 < key.length()) {
+            return extract(jsonNode.get(key.substring(0, index)), key.substring(index + 1));
+        }
+        return null;
+    }
+
+    /**
+     * Deserialize from byte array and return a ${@link JsonNode}
+     *
+     * @param message The byte array of raw data
+     * @return The JsonNode
+     * @throws IOException The exceptions may throws when deserialize
+     */
+    @Override
+    public JsonNode deserialize(byte[] message) throws IOException {
+        return objectMapper.readTree(message);
+    }
+
+    /**
+     * Parse msg and replace the value by key from meta data and physical.
+     * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)}
+     *
+     * @param message The source of data rows format by bytes
+     * @param pattern The pattern value
+     * @return The result of parsed
+     * @throws IOException The exception will throws
+     */
+    @Override
+    public String parse(byte[] message, String pattern) throws IOException {
+        return parse(deserialize(message), pattern);
+    }
+
+    /**
+     * Parse msg and replace the value by key from meta data and physical.
+     * Such as:
+     * 1. give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be '123'
+     * 2. give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be '1_2_3'
+     * 3. give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be 'prefix_1_2_3_suffix'
+     *
+     * @param rootNode The root node of json
+     * @param pattern The pattern value
+     * @return The result of parsed
+     * @throws IOException The exception will throws
+     */
+    @Override
+    public String parse(JsonNode rootNode, String pattern) throws IOException {

Review Comment:
   The essence of pattern extraction is to extract from data. It should be reasonable to distinguish between small physical fields and metadata fields. If there are both, physical fields are preferred.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991772033


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // Only support dynamic topic when the topicPattern is specified
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            try {
+                return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+                        .parse(element.getBinary(0), topicPattern);

Review Comment:
   It is ok, but it is determined by the raw format(The Raw format allows to read and write raw (byte based) values as a single column(https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/raw/).)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991758374


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat<T> {
+
+    public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will used to extract

Review Comment:
   will be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991179290


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Dynamic schema format factory
+ */
+public class DynamicSchemaFormatFactory {
+
+    private static final List<AbstractDynamicSchemaFormat<?>> SUPPORT_FORMATS =
+            new ArrayList<AbstractDynamicSchemaFormat<?>>() {
+
+                private static final long serialVersionUID = 1L;
+
+                {
+                    add(CanalJsonDynamicSchemaFormat.getInstance());
+                    add(DebeziumJsonDynamicSchemaFormat.getInstance());
+                }
+            };
+
+    /**
+     * Get format from the format name, only supports [canal-json|debezium-json] at now

Review Comment:
   at now -> for now 



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The btye array of raw data

Review Comment:
   btye -> byte



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991869898


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    @Override
+    public List<String> extract(byte[] message, String... keys) throws IOException {
+        if (keys == null || keys.length == 0) {
+            return new ArrayList<>();
+        }
+        final JsonNode root = deserialize(message);
+        JsonNode physicalNode = getPhysicalData(root);
+        List<String> values = new ArrayList<>(keys.length);
+        if (physicalNode == null) {
+            for (String key : keys) {
+                values.add(extract(root, key));
+            }
+            return values;
+        }
+        for (String key : keys) {
+            String value = extract(physicalNode, key);
+            if (value == null) {
+                value = extract(root, key);
+            }
+            values.add(value);
+        }
+        return values;
+    }
+
+    /**
+     * Extract value by key from ${@link JsonNode}
+     *
+     * @param jsonNode The json node
+     * @param key The key that will be used to extract
+     * @return The value maps the key in the json node
+     */
+    @Override
+    public String extract(JsonNode jsonNode, String key) {
+        if (jsonNode == null || key == null) {
+            return null;
+        }
+        JsonNode value = jsonNode.get(key);
+        if (value != null) {
+            return value.asText();
+        }
+        int index = key.indexOf(".");
+        if (index > 0 && index + 1 < key.length()) {
+            return extract(jsonNode.get(key.substring(0, index)), key.substring(index + 1));
+        }
+        return null;
+    }
+
+    /**
+     * Deserialize from byte array and return a ${@link JsonNode}
+     *
+     * @param message The byte array of raw data
+     * @return The JsonNode
+     * @throws IOException The exceptions may throws when deserialize
+     */
+    @Override
+    public JsonNode deserialize(byte[] message) throws IOException {
+        return objectMapper.readTree(message);
+    }
+
+    /**
+     * Parse msg and replace the value by key from meta data and physical.
+     * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)}
+     *
+     * @param message The source of data rows format by bytes
+     * @param pattern The pattern value
+     * @return The result of parsed
+     * @throws IOException The exception will throws
+     */
+    @Override
+    public String parse(byte[] message, String pattern) throws IOException {
+        return parse(deserialize(message), pattern);
+    }
+
+    /**
+     * Parse msg and replace the value by key from meta data and physical.
+     * Such as:
+     * 1. give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be '123'
+     * 2. give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be '1_2_3'
+     * 3. give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+     * the result of pared will be 'prefix_1_2_3_suffix'
+     *
+     * @param rootNode The root node of json
+     * @param pattern The pattern value
+     * @return The result of parsed
+     * @throws IOException The exception will throws
+     */
+    @Override
+    public String parse(JsonNode rootNode, String pattern) throws IOException {

Review Comment:
   Serach in `data` JsonNode  first then search in `root` JsonNode second?A little bit stange getting sink table from physicalData,what if there is a field named `database`  in physicalData?(╯﹏╰)b
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991939878


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##########
@@ -86,25 +86,30 @@ public final class Constants {
     public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
 
     public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric.labels")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
-                    + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+            ConfigOptions.key("inlong.metric.labels")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                            + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
     public static final ConfigOption<String> INLONG_AUDIT =
-        ConfigOptions.key("metrics.audit.proxy.hosts")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("Audit proxy host address for reporting audit metrics. \n"
-                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+            ConfigOptions.key("metrics.audit.proxy.hosts")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                            + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
             ConfigOptions.key("sink.ignore.changelog")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Regard upsert delete as insert kind.");
 
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inner.format")

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991850958


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -82,7 +81,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
     /**
      * Optional format for encoding keys to Kafka.
      */
-    protected final @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
+    protected final @Nullable
+    EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;

Review Comment:
   format problem?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
     /**
      * Prefix that needs to be removed from fields when constructing the physical data type.
      */
-    protected final @Nullable String keyPrefix;
+    protected final @Nullable

Review Comment:
   format problem?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
     /**
      * Prefix that needs to be removed from fields when constructing the physical data type.
      */
-    protected final @Nullable String keyPrefix;
+    protected final @Nullable
+    String keyPrefix;
     /**
      * The Kafka topic to write to.
      */
     protected final String topic;
+    protected final String topicPattern;
     /**
      * Properties for the Kafka producer.
      */
     protected final Properties properties;
     /**
      * Partitioner to select Kafka partition for each item.
      */
-    protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+    protected final @Nullable

Review Comment:
   format problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu merged pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu merged PR #6123:
URL: https://github.com/apache/inlong/pull/6123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991866539


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    @Override
+    public List<String> extract(byte[] message, String... keys) throws IOException {
+        if (keys == null || keys.length == 0) {
+            return new ArrayList<>();
+        }
+        final JsonNode root = deserialize(message);
+        JsonNode physicalNode = getPhysicalData(root);
+        List<String> values = new ArrayList<>(keys.length);
+        if (physicalNode == null) {
+            for (String key : keys) {
+                values.add(extract(root, key));
+            }
+            return values;
+        }
+        for (String key : keys) {
+            String value = extract(physicalNode, key);
+            if (value == null) {
+                value = extract(root, key);
+            }
+            values.add(value);
+        }
+        return values;
+    }
+
+    /**
+     * Extract value by key from ${@link JsonNode}
+     *
+     * @param jsonNode The json node
+     * @param key The key that will be used to extract
+     * @return The value maps the key in the json node
+     */
+    @Override
+    public String extract(JsonNode jsonNode, String key) {
+        if (jsonNode == null || key == null) {
+            return null;
+        }
+        JsonNode value = jsonNode.get(key);

Review Comment:
   will return for the first match, what if the canal json data contains a key named database? the method will return the data value not the metadata database



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991938239


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
                 getValueEncodingFormat(helper);
 
+        final String innerValueDecodingFormat = getInnerDecodingFormat(helper);

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991849455


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
                 getValueEncodingFormat(helper);
 
+        final String innerValueDecodingFormat = getInnerDecodingFormat(helper);

Review Comment:
   Once here innerValueDecodingFormat  is used for raw datatype , validate physicalDataType?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991866312


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
                 getSinkSemantic(tableOptions),
                 parallelism,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                innerValueDecodingFormat,
+                tableOptions.getOptional(TOPIC_PATTERN).orElse(null));

Review Comment:
   Yes i reused the 'TOPIC_PATTERN' for kafka sink, maybe you can define some common names for other multiple sink connector, such as 'database-pattern', 'table-pattern', and so on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991862654


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
                 getSinkSemantic(tableOptions),
                 parallelism,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                innerValueDecodingFormat,
+                tableOptions.getOptional(TOPIC_PATTERN).orElse(null));

Review Comment:
   Reused source option TOPIC_PATTERN for sink?How to extend it to other multiple sink connector?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991757209


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // Only support dynamic topic when the topicPattern is specified
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            try {
+                return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+                        .parse(element.getBinary(0), topicPattern);

Review Comment:
   extract the index 0 as constant and add a comment why it should be 0 would be better since people may be confused here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991776722


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // Only support dynamic topic when the topicPattern is specified
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            try {
+                return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+                        .parse(element.getBinary(0), topicPattern);

Review Comment:
   noted, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991139744


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Debezium json dynamic format
+ */
+public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
+
+    private static final String IDENTIFIER = "debezium-json";
+
+    private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
+
+    private DebeziumJsonDynamicSchemaFormat() {
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static AbstractDynamicSchemaFormat getInstance() {
+        return FORMAT;
+    }
+
+    @Override
+    protected JsonNode getPhysicalData(JsonNode root) {
+        return root.get("after");

Review Comment:
   How to get topic name  when `op`  is delete data. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991847775


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##########
@@ -86,25 +86,30 @@ public final class Constants {
     public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
 
     public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric.labels")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
-                    + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+            ConfigOptions.key("inlong.metric.labels")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                            + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
     public static final ConfigOption<String> INLONG_AUDIT =
-        ConfigOptions.key("metrics.audit.proxy.hosts")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("Audit proxy host address for reporting audit metrics. \n"
-                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+            ConfigOptions.key("metrics.audit.proxy.hosts")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                            + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
             ConfigOptions.key("sink.ignore.changelog")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Regard upsert delete as insert kind.");
 
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inner.format")

Review Comment:
   how about 'sink.multiple.format?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991875873


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    @Override
+    public List<String> extract(byte[] message, String... keys) throws IOException {
+        if (keys == null || keys.length == 0) {
+            return new ArrayList<>();
+        }
+        final JsonNode root = deserialize(message);
+        JsonNode physicalNode = getPhysicalData(root);
+        List<String> values = new ArrayList<>(keys.length);
+        if (physicalNode == null) {
+            for (String key : keys) {
+                values.add(extract(root, key));
+            }
+            return values;
+        }
+        for (String key : keys) {
+            String value = extract(physicalNode, key);
+            if (value == null) {
+                value = extract(root, key);
+            }
+            values.add(value);
+        }
+        return values;
+    }
+
+    /**
+     * Extract value by key from ${@link JsonNode}
+     *
+     * @param jsonNode The json node
+     * @param key The key that will be used to extract
+     * @return The value maps the key in the json node
+     */
+    @Override
+    public String extract(JsonNode jsonNode, String key) {
+        if (jsonNode == null || key == null) {
+            return null;
+        }
+        JsonNode value = jsonNode.get(key);

Review Comment:
   We can do this first, and then expose a parameter to decide whether to prefer the physical field or the metadata field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991137398


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -176,6 +186,17 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // Only support dymic topic when the dymicTopic is true
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            try {
+                return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+                        .parse(element.getBinary(0), topicPattern);
+            } catch (IOException e) {
+                // Ignore the parse error and it will return the default topic final.
+                e.printStackTrace();
+            }
+        }

Review Comment:
   1、e.printStackTrace() change to log.warn().
   2、`dymic` spell error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991926032


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat<T> {
+
+    public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    public List<String> extract(byte[] message, String... keys) throws IOException {

Review Comment:
   does this method ever used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991941629


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat<T> {
+
+    public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Extract value by key from the raw data
+     *
+     * @param message The byte array of raw data
+     * @param keys The key list that will be used to extract
+     * @return The value list maps the keys
+     * @throws IOException The exceptions may throws when extract
+     */
+    public List<String> extract(byte[] message, String... keys) throws IOException {

Review Comment:
   This method is not used, and may be used in the future, such as directly extracting the values ​​of multiple keys at one time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org