You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/18 11:05:06 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998049974


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +164,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : "raw_hash";
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),          //id

Review Comment:
   There is no need to add comments for each parameter.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +164,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : "raw_hash";

Review Comment:
   1. Why set the non-null value to `raw_hash`?
   2. If the `raw_hash` is needed, please extract it to a String constant.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")

Review Comment:
   What does it mean for `database-table` in Kafka?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")

Review Comment:
   1. Suggest removing the `, a note for users.`.
   2. What is the optional value for `autoCreateTopics`? Could you please add the comment in the `ApiModelProperty`?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")

Review Comment:
   ```suggestion
       @ApiModelProperty(value = "Partition strategy")
   ```



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   What does the `data multiple format` mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org