You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/18 03:55:57 UTC

[GitHub] [inlong] Yizhou-Yang opened a new pull request, #6202: [INLONG-6054] [Manager] edit parameters of kafka/mysql/doris source and sink to support whole database migration #6054

Yizhou-Yang opened a new pull request, #6202:
URL: https://github.com/apache/inlong/pull/6202

   [INLONG-6054](https://github.com/apache/inlong/issues) 6054)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang closed pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang closed pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration
URL: https://github.com/apache/inlong/pull/6202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998312729


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java:
##########
@@ -56,4 +56,16 @@ public class KafkaSinkRequest extends SinkRequest {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the topic mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not. values: true, false.")
+    private String autoCreateTopics;

Review Comment:
   It is not necessary to put this parameter in the manager.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),
+                    kafkaSink.getSinkName(),
+                    fieldInfos,
+                    fieldRelations,
+                    Lists.newArrayList(),
+                    null,
+                    kafkaSink.getTopicName(),
+                    kafkaSink.getBootstrapServers(),
+                    format,
+                    sinkParallelism,
+                    properties,
+                    kafkaSink.getPrimaryKey()
+            );
+        }
+
         return new KafkaLoadNode(
                 kafkaSink.getSinkName(),
                 kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                kafkaSink.getTopicName(),
+                "mock_topic",

Review Comment:
   Maybe it is not necessary to add a default for the topic?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java:
##########
@@ -54,9 +54,21 @@ public class KafkaSinkDTO {
             notes = "including earliest, latest (the default), none")
     private String autoOffsetReset;
 
+    @ApiModelProperty(value = "Automatically create kafka topic or not. values: true, false.")
+    private String autoCreateTopics;

Review Comment:
   It is not necessary to put this parameter in the manager.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;

Review Comment:
   This is a error logic.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();

Review Comment:
   This is a error logic.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();

Review Comment:
   This is a error logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998472042


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,15 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the topic mapping rule")

Review Comment:
   `the topic mapping rule` -> `Topic mapping rule`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998472504


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   What is the outer format? I didn't see it in KafkaSink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998339208


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),
+                    kafkaSink.getSinkName(),
+                    fieldInfos,
+                    fieldRelations,
+                    Lists.newArrayList(),
+                    null,
+                    kafkaSink.getTopicName(),
+                    kafkaSink.getBootstrapServers(),
+                    format,
+                    sinkParallelism,
+                    properties,
+                    kafkaSink.getPrimaryKey()
+            );
+        }
+
         return new KafkaLoadNode(
                 kafkaSink.getSinkName(),
                 kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                kafkaSink.getTopicName(),
+                "mock_topic",

Review Comment:
   topicname is a nonnull parameter, so this must not be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998066796


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   for debezium and canal, the outer format is "raw" , and then sinkmultipleformat is "debezium"/"canal"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998105336


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")

Review Comment:
   changed



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
Yizhou-Yang commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998099205


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   used by sort connector to parse debezium/canal json



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r999388571


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH;
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),
+                    kafkaSink.getSinkName(),
+                    fieldInfos,
+                    fieldRelations,
+                    Lists.newArrayList(),
+                    null,
+                    kafkaSink.getTopicName(),
+                    kafkaSink.getBootstrapServers(),
+                    format,
+                    sinkParallelism,
+                    properties,
+                    kafkaSink.getPrimaryKey()
+            );
+        }
+
         return new KafkaLoadNode(
                 kafkaSink.getSinkName(),
                 kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                kafkaSink.getTopicName(),
+                "mock_topic",

Review Comment:
   You can do it in upstream services for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998049974


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +164,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : "raw_hash";
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),          //id

Review Comment:
   There is no need to add comments for each parameter.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +164,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : "raw_hash";

Review Comment:
   1. Why set the non-null value to `raw_hash`?
   2. If the `raw_hash` is needed, please extract it to a String constant.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")

Review Comment:
   What does it mean for `database-table` in Kafka?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")

Review Comment:
   1. Suggest removing the `, a note for users.`.
   2. What is the optional value for `autoCreateTopics`? Could you please add the comment in the `ApiModelProperty`?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")

Review Comment:
   ```suggestion
       @ApiModelProperty(value = "Partition strategy")
   ```



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java:
##########
@@ -61,6 +61,18 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("the database-table mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty(value = "Automatically create kafka topic or not, a note for users.")
+    private String autoCreateTopics;
+
+    @ApiModelProperty(value = "the partition strategy for kafka")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   What does the `data multiple format` mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998474289


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -167,24 +170,64 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo>
             case DEBEZIUM_JSON:
                 format = new DebeziumJsonFormat();
                 break;
+            case RAW:
+                format = new RawFormat();
+                break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
         }
 
+        DataTypeEnum innerDataType = DataTypeEnum.forName(kafkaSink.getInnerFormat());
+        Format innerFormat = null;
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.RAW) {
+            switch (innerDataType) {
+                case DEBEZIUM_JSON:
+                    innerFormat = new DebeziumJsonFormat();
+                    break;
+                case CANAL:
+                    innerFormat = new CanalJsonFormat();
+                    break;

Review Comment:
   Please add a default statement for `switch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6202: [INLONG-6054][Manager] Kafka sink supports all migration

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r998834228


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java:
##########
@@ -56,4 +56,13 @@ public class KafkaSinkRequest extends SinkRequest {
     @ApiModelProperty("Primary key is required when serializationType is json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("Topic mapping rule")
+    private String topicPattern;
+
+    @ApiModelProperty("Partition strategy")
+    private String partitionStrategy;
+
+    @ApiModelProperty("data multiple format,only applicable when outer format is raw")

Review Comment:
   ```suggestion
       @ApiModelProperty("Multiple formats for data content, only needed when the format is 'raw'")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org