You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/24 03:35:11 UTC

[incubator-seatunnel] branch api-draft updated: [bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 500799d8b [bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)
500799d8b is described below

commit 500799d8bce41ad3203987556c528f9e6a0e0a22
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri Jun 24 11:35:05 2022 +0800

    [bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)
    
    * [bugfix] Fix KafkaSource parallel mode failure
    
    * delete unused parameter
    
    * delete unused parameter
---
 .../seatunnel/kafka/source/KafkaSourceReader.java         |  5 +++--
 .../kafka/source/KafkaSourceSplitEnumerator.java          | 15 +++++++--------
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 93153b204..d4272e4a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -147,10 +147,11 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
 
     private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer, String consumerGroup,
                                                        Properties properties, boolean autoCommit) {
-        Properties props = new Properties(properties);
+        Properties props = new Properties();
+        properties.forEach((key, value) -> props.setProperty(String.valueOf(key), String.valueOf(value)));
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-consumer");
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-consumer-" + this.hashCode());
 
         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 90bdc2b72..a596dfc54 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,7 +72,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
     @Override
     public void run() throws ExecutionException, InterruptedException {
         pendingSplit = getTopicInfo();
-        assignSplit(context.registeredReaders());
+        assignSplit();
     }
 
     @Override
@@ -87,7 +86,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
             pendingSplit.addAll(splits);
-            assignSplit(Collections.singletonList(subtaskId));
+            assignSplit();
         }
     }
 
@@ -104,7 +103,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
     @Override
     public void registerReader(int subtaskId) {
         if (!pendingSplit.isEmpty()) {
-            assignSplit(Collections.singletonList(subtaskId));
+            assignSplit();
         }
     }
 
@@ -121,7 +120,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
     private AdminClient initAdminClient(Properties properties) {
         Properties props = new Properties(properties);
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServer());
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client");
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode());
         return AdminClient.create(props);
     }
 
@@ -145,12 +144,12 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
                 }).collect(Collectors.toSet());
     }
 
-    private void assignSplit(Collection<Integer> taskIDList) {
+    private void assignSplit() {
         Map<Integer, List<KafkaSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
-        for (int taskID : taskIDList) {
+        for (int taskID = 0;  taskID < context.currentParallelism(); taskID++) {
             readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
         }
-        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.getTopicPartition(), taskIDList.size()))
+        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.getTopicPartition(), context.currentParallelism()))
                 .add(s));
 
         readySplit.forEach(context::assignSplit);