You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/24 03:35:11 UTC
[incubator-seatunnel] branch api-draft updated: [bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 500799d8b [bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)
500799d8b is described below
commit 500799d8bce41ad3203987556c528f9e6a0e0a22
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri Jun 24 11:35:05 2022 +0800
[bugfix][api-draft] Fix KafkaSource parallel mode failure (#2039)
* [bugfix] Fix KafkaSource parallel mode failure
* delete unused parameter
* delete unused parameter
---
.../seatunnel/kafka/source/KafkaSourceReader.java | 5 +++--
.../kafka/source/KafkaSourceSplitEnumerator.java | 15 +++++++--------
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 93153b204..d4272e4a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -147,10 +147,11 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer, String consumerGroup,
Properties properties, boolean autoCommit) {
- Properties props = new Properties(properties);
+ Properties props = new Properties();
+ properties.forEach((key, value) -> props.setProperty(String.valueOf(key), String.valueOf(value)));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-consumer");
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-consumer-" + this.hashCode());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 90bdc2b72..a596dfc54 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,7 +72,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
@Override
public void run() throws ExecutionException, InterruptedException {
pendingSplit = getTopicInfo();
- assignSplit(context.registeredReaders());
+ assignSplit();
}
@Override
@@ -87,7 +86,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.addAll(splits);
- assignSplit(Collections.singletonList(subtaskId));
+ assignSplit();
}
}
@@ -104,7 +103,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
@Override
public void registerReader(int subtaskId) {
if (!pendingSplit.isEmpty()) {
- assignSplit(Collections.singletonList(subtaskId));
+ assignSplit();
}
}
@@ -121,7 +120,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
private AdminClient initAdminClient(Properties properties) {
Properties props = new Properties(properties);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServer());
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client");
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode());
return AdminClient.create(props);
}
@@ -145,12 +144,12 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
}).collect(Collectors.toSet());
}
- private void assignSplit(Collection<Integer> taskIDList) {
+ private void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
- for (int taskID : taskIDList) {
+ for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}
- pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.getTopicPartition(), taskIDList.size()))
+ pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.getTopicPartition(), context.currentParallelism()))
.add(s));
readySplit.forEach(context::assignSplit);