You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/21 07:20:35 UTC
[incubator-seatunnel] branch dev updated: [Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9f21d4c76 [Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)
9f21d4c76 is described below
commit 9f21d4c76905b195bd565492b26e0f770e7e61b0
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Sep 21 15:20:28 2022 +0800
[Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)
* [Bug][Connector-V2] Hive Source Connector parallelism not work
* [Bug][Connector-V2] Fix code style
* [Bug][Connector-V2] Fix assigned splits
---
.../file/source/BaseFileSourceReader.java | 5 --
.../source/split/FileSourceSplitEnumerator.java | 65 ++++++++++++----------
2 files changed, 36 insertions(+), 34 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index fa13ec66c..3b2088b4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Set;
public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSourceSplit> {
- private static final long THREAD_WAIT_TIME = 500L;
private final ReadStrategy readStrategy;
private final HadoopConf hadoopConf;
private final SourceReader.Context context;
@@ -56,10 +55,6 @@ public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSour
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (sourceSplits.isEmpty()) {
- Thread.sleep(THREAD_WAIT_TIME);
- return;
- }
sourceSplits.forEach(source -> {
try {
readStrategy.read(source.splitId(), output);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
index fa3cebd28..49bcb0a04 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -18,19 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+@Slf4j
public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
private final Context<FileSourceSplit> context;
private Set<FileSourceSplit> pendingSplit;
@@ -40,6 +39,7 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
this.context = context;
this.filePaths = filePaths;
+ this.assignedSplit = new HashSet<>();
}
public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths,
@@ -50,51 +50,59 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
@Override
public void open() {
- this.assignedSplit = new HashSet<>();
this.pendingSplit = new HashSet<>();
}
@Override
public void run() {
- pendingSplit = getHiveFileSplit();
- assignSplit(context.registeredReaders());
+ // do nothing
}
- private Set<FileSourceSplit> getHiveFileSplit() {
- Set<FileSourceSplit> hiveSourceSplits = new HashSet<>();
- filePaths.forEach(k -> hiveSourceSplits.add(new FileSourceSplit(k)));
- return hiveSourceSplits;
-
+ private Set<FileSourceSplit> getFileSplit() {
+ Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+ filePaths.forEach(k -> fileSourceSplits.add(new FileSourceSplit(k)));
+ return fileSourceSplits;
}
@Override
public void close() throws IOException {
-
+ // do nothing
}
@Override
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.addAll(splits);
- assignSplit(Collections.singletonList(subtaskId));
+ assignSplit(subtaskId);
}
}
- private void assignSplit(Collection<Integer> taskIDList) {
- Map<Integer, List<FileSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
- for (int taskID : taskIDList) {
- readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ private void assignSplit(int taskId) {
+ ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
+ if (context.currentParallelism() == 1) {
+ // if parallelism == 1, we should assign all the splits to reader
+ currentTaskSplits.addAll(pendingSplit);
+ } else {
+ // if parallelism > 1, according to hashCode of split's id to determine whether to allocate the current task
+ for (FileSourceSplit fileSourceSplit : pendingSplit) {
+ int splitOwner = getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism());
+ if (splitOwner == taskId) {
+ currentTaskSplits.add(fileSourceSplit);
+ }
+ }
}
-
- pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size()))
- .add(s));
- readySplit.forEach(context::assignSplit);
- assignedSplit.addAll(pendingSplit);
- pendingSplit.clear();
+ // assign splits
+ context.assignSplit(taskId, currentTaskSplits);
+ // save the state of assigned splits
+ assignedSplit.addAll(currentTaskSplits);
+ // remove the assigned splits from pending splits
+ currentTaskSplits.forEach(split -> pendingSplit.remove(split));
+ log.info("SubTask {} is assigned to [{}]", taskId, currentTaskSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(",")));
+ context.signalNoMoreSplits(taskId);
}
private static int getSplitOwner(String tp, int numReaders) {
- return tp.hashCode() % numReaders;
+ return Math.abs(tp.hashCode()) % numReaders;
}
@Override
@@ -104,9 +112,8 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
@Override
public void registerReader(int subtaskId) {
- if (!pendingSplit.isEmpty()) {
- assignSplit(Collections.singletonList(subtaskId));
- }
+ pendingSplit = getFileSplit();
+ assignSplit(subtaskId);
}
@Override