You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/18 14:33:22 UTC

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

yunqingmoswu commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998312729


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java:
##########
@@ -56,4 +56,16 @@ public class KafkaSinkRequest extends SinkRequest {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the topic mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not. values: true, false.")
+    private String autoCreateTopics;

Review Comment:
   It is not necessary to put this parameter in the manager.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),
+                    kafkaSink.getSinkName(),
+                    fieldInfos,
+                    fieldRelations,
+                    Lists.newArrayList(),
+                    null,
+                    kafkaSink.getTopicName(),
+                    kafkaSink.getBootstrapServers(),
+                    format,
+                    sinkParallelism,
+                    properties,
+                    kafkaSink.getPrimaryKey()
+            );
+        }
+
         return new KafkaLoadNode(
                 kafkaSink.getSinkName(),
                 kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                kafkaSink.getTopicName(),
+                "mock_topic",

Review Comment:
   Maybe it is not necessary to add a default for the topic?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java:
##########
@@ -54,9 +54,21 @@ public class KafkaSinkDTO {
             notes = "including earliest, latest (the default), none")
     private String autoOffsetReset;
 
+    @ApiModelProperty(value = "Automatically create kafka topic or not. values: true, false.")
+    private String autoCreateTopics;

Review Comment:
   It is not necessary to put this parameter in the manager.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;

Review Comment:
   This is a error logic.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();

Review Comment:
   This is a error logic.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();

Review Comment:
   This is a error logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org