You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TyrantLucifer (via GitHub)" <gi...@apache.org> on 2023/04/11 13:58:54 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4524: [Improve][connector][kafka] Set default value for partition option

TyrantLucifer commented on code in PR #4524:
URL: https://github.com/apache/incubator-seatunnel/pull/4524#discussion_r1162861689


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -174,16 +175,24 @@ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
         }
 
         String topic = pluginConfig.get(TOPIC);
-        if (pluginConfig.get(PARTITION) != null) {
-            return DefaultSeaTunnelRowSerializer.create(
-                    topic, pluginConfig.get(PARTITION), seaTunnelRowType, messageFormat, delimiter);
-        } else {
+        if (pluginConfig.get(PARTITION_KEY_FIELDS) != null && pluginConfig.get(PARTITION) != null) {
+            throw new KafkaConnectorException(
+                    KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED,
+                    "Cannot select both `partiton` and `partition_key_fields`. You can configure only one of them");
+        }
+        if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
                     getPartitionKeyFields(pluginConfig, seaTunnelRowType),
                     seaTunnelRowType,
                     messageFormat,
                     delimiter);
+        } else if (pluginConfig.get(PARTITION) != null) {

Review Comment:
   ```suggestion
   if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
       return xxxx
   }
   if (pluginConfig.get(PARTITION) != null) {
       return xxxx
   }
   return xxxxx;
   
   
   ```
   }



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -396,6 +410,28 @@ private Map<String, String> getKafkaConsumerData(String topicName) {
         return data;
     }
 
+    private List<String> getKafkaConsumerListData(String topicName) {
+        List<String> data = new ArrayList<>();
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {

Review Comment:
   `for each` maybe better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org