You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/10 10:28:30 UTC

[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

gong commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991137398


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -176,6 +186,17 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // Only support dymic topic when the dymicTopic is true
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            try {
+                return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+                        .parse(element.getBinary(0), topicPattern);
+            } catch (IOException e) {
+                // Ignore the parse error and it will return the default topic final.
+                e.printStackTrace();
+            }
+        }

Review Comment:
   1、e.printStackTrace() change to log.warn().
   2、`dymic` spell error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org