You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/21 07:20:35 UTC

[incubator-seatunnel] branch dev updated: [Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9f21d4c76 [Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)
9f21d4c76 is described below

commit 9f21d4c76905b195bd565492b26e0f770e7e61b0
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Sep 21 15:20:28 2022 +0800

    [Bug] [Connector-V2] Fix hive source connector parallelism not work (#2823)
    
    * [Bug][Connector-V2] Hive Source Connector parallelism not work
    
    * [Bug][Connector-V2] Fix code style
    
    * [Bug][Connector-V2] Fix assigned splits
---
 .../file/source/BaseFileSourceReader.java          |  5 --
 .../source/split/FileSourceSplitEnumerator.java    | 65 ++++++++++++----------
 2 files changed, 36 insertions(+), 34 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index fa13ec66c..3b2088b4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Set;
 
 public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSourceSplit> {
-    private static final long THREAD_WAIT_TIME = 500L;
     private final ReadStrategy readStrategy;
     private final HadoopConf hadoopConf;
     private final SourceReader.Context context;
@@ -56,10 +55,6 @@ public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSour
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
         sourceSplits.forEach(source -> {
             try {
                 readStrategy.read(source.splitId(), output);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
index fa3cebd28..49bcb0a04 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -18,19 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+@Slf4j
 public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
     private final Context<FileSourceSplit> context;
     private Set<FileSourceSplit> pendingSplit;
@@ -40,6 +39,7 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
     public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
         this.context = context;
         this.filePaths = filePaths;
+        this.assignedSplit = new HashSet<>();
     }
 
     public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths,
@@ -50,51 +50,59 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
 
     @Override
     public void open() {
-        this.assignedSplit = new HashSet<>();
         this.pendingSplit = new HashSet<>();
     }
 
     @Override
     public void run() {
-        pendingSplit = getHiveFileSplit();
-        assignSplit(context.registeredReaders());
+        // do nothing
     }
 
-    private Set<FileSourceSplit> getHiveFileSplit() {
-        Set<FileSourceSplit> hiveSourceSplits = new HashSet<>();
-        filePaths.forEach(k -> hiveSourceSplits.add(new FileSourceSplit(k)));
-        return hiveSourceSplits;
-
+    private Set<FileSourceSplit> getFileSplit() {
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        filePaths.forEach(k -> fileSourceSplits.add(new FileSourceSplit(k)));
+        return fileSourceSplits;
     }
 
     @Override
     public void close() throws IOException {
-
+        // do nothing
     }
 
     @Override
     public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
             pendingSplit.addAll(splits);
-            assignSplit(Collections.singletonList(subtaskId));
+            assignSplit(subtaskId);
         }
     }
 
-    private void assignSplit(Collection<Integer> taskIDList) {
-        Map<Integer, List<FileSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
-        for (int taskID : taskIDList) {
-            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+    private void assignSplit(int taskId) {
+        ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
+        if (context.currentParallelism() == 1) {
+            // if parallelism == 1, we should assign all the splits to reader
+            currentTaskSplits.addAll(pendingSplit);
+        } else {
+            // if parallelism > 1, according to hashCode of split's id to determine whether to allocate the current task
+            for (FileSourceSplit fileSourceSplit : pendingSplit) {
+                int splitOwner = getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism());
+                if (splitOwner == taskId) {
+                    currentTaskSplits.add(fileSourceSplit);
+                }
+            }
         }
-
-        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size()))
-                .add(s));
-        readySplit.forEach(context::assignSplit);
-        assignedSplit.addAll(pendingSplit);
-        pendingSplit.clear();
+        // assign splits
+        context.assignSplit(taskId, currentTaskSplits);
+        // save the state of assigned splits
+        assignedSplit.addAll(currentTaskSplits);
+        // remove the assigned splits from pending splits
+        currentTaskSplits.forEach(split -> pendingSplit.remove(split));
+        log.info("SubTask {} is assigned to [{}]", taskId, currentTaskSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(",")));
+        context.signalNoMoreSplits(taskId);
     }
 
     private static int getSplitOwner(String tp, int numReaders) {
-        return tp.hashCode() % numReaders;
+        return Math.abs(tp.hashCode()) % numReaders;
     }
 
     @Override
@@ -104,9 +112,8 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
 
     @Override
     public void registerReader(int subtaskId) {
-        if (!pendingSplit.isEmpty()) {
-            assignSplit(Collections.singletonList(subtaskId));
-        }
+        pendingSplit = getFileSplit();
+        assignSplit(subtaskId);
     }
 
     @Override