You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/05/29 14:31:34 UTC

[incubator-hugegraph-computer] branch master updated: feat: implement parallel send data in load graph step (#248)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df3b430 feat: implement parallel send data in load graph step (#248)
7df3b430 is described below

commit 7df3b430387d7a4ae556969995d3c660855eac40
Author: Aaron Wang <wa...@gmail.com>
AuthorDate: Mon May 29 22:31:28 2023 +0800

    feat: implement parallel send data in load graph step (#248)
    
    * fix: Filesystem closed caused by cache only one HDFS client
    
    * improve: reuse fetcher instance for vertex and edge sending
    
    ---------
    
    Co-authored-by: imbajin <ji...@apache.org>
---
 .../computer/core/config/ComputerOptions.java      |  8 +++
 .../computer/core/compute/ComputeManager.java      |  2 +-
 .../computer/core/input/WorkerInputManager.java    | 69 ++++++++++++++++++----
 .../computer/core/worker/load/LoadService.java     | 53 +++++++++++------
 4 files changed, 103 insertions(+), 29 deletions(-)

diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
index f28f0208..765f8adb 100644
--- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
+++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
@@ -198,6 +198,14 @@ public class ComputerOptions extends OptionHolder {
                     ""
             );
 
+    public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
+            new ConfigOption<>(
+                    "input.send_thread_nums",
+                    "The number of threads for parallel sending vertex or edge.",
+                    positiveInt(),
+                    4
+            );
+
     public static final ConfigOption<Integer> SORT_THREAD_NUMS =
             new ConfigOption<>(
                     "sort.thread_nums",
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
index 685f47b9..4898abe6 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
@@ -64,7 +64,7 @@ public class ComputeManager {
         int computeThreadNum = this.partitionComputeThreadNum(context.config());
         this.computeExecutor = ExecutorUtil.newFixedThreadPool(
                                computeThreadNum, PREFIX);
-        LOG.info("Created partition compute thread poll, thread num: {}",
+        LOG.info("Created partition compute thread pool, thread num: {}",
                  computeThreadNum);
     }
 
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
index 34a1352d..c5e4a766 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
@@ -17,9 +17,17 @@
 
 package org.apache.hugegraph.computer.core.input;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.hugegraph.computer.core.common.ComputerContext;
+import org.apache.hugegraph.computer.core.common.exception.ComputerException;
+import org.apache.hugegraph.computer.core.config.ComputerOptions;
 import org.apache.hugegraph.computer.core.config.Config;
 import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
 import org.apache.hugegraph.computer.core.manager.Manager;
@@ -27,9 +35,15 @@ import org.apache.hugegraph.computer.core.network.message.MessageType;
 import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
 import org.apache.hugegraph.computer.core.sender.MessageSendManager;
 import org.apache.hugegraph.computer.core.worker.load.LoadService;
+import org.apache.hugegraph.util.ExecutorUtil;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
 
 public class WorkerInputManager implements Manager {
 
+    private static final Logger LOG = Log.logger(WorkerInputManager.class);
+    private static final String PREFIX = "input-send-executor-%s";
+
     public static final String NAME = "worker_input";
 
     /*
@@ -37,6 +51,10 @@ public class WorkerInputManager implements Manager {
      * to computer-vertices and computer-edges
      */
     private final LoadService loadService;
+
+    private final ExecutorService sendExecutor;
+    private final int sendThreadNum;
+
     /*
      * Send vertex/edge or message to target worker
      */
@@ -44,8 +62,18 @@ public class WorkerInputManager implements Manager {
 
     public WorkerInputManager(ComputerContext context,
                               MessageSendManager sendManager) {
-        this.loadService = new LoadService(context);
         this.sendManager = sendManager;
+
+        this.sendThreadNum = this.inputSendThreadNum(context.config());
+        this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
+        LOG.info("Created parallel sending thread pool, thread num: {}",
+                sendThreadNum);
+
+        this.loadService = new LoadService(context);
+    }
+
+    private Integer inputSendThreadNum(Config config) {
+        return config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
     }
 
     @Override
@@ -63,6 +91,7 @@ public class WorkerInputManager implements Manager {
     public void close(Config config) {
         this.loadService.close();
         this.sendManager.close(config);
+        this.sendExecutor.shutdown();
     }
 
     public void service(InputSplitRpcService rpcService) {
@@ -70,26 +99,46 @@ public class WorkerInputManager implements Manager {
     }
 
     /**
-     * TODO: Load vertices and edges parallel.
      * When this method finish, it means that all vertices and edges are sent,
      * but there is no guarantee that all of them has been received.
      */
     public void loadGraph() {
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+        CompletableFuture<?> future;
         this.sendManager.startSend(MessageType.VERTEX);
-        Iterator<Vertex> iterator = this.loadService.createIteratorFromVertex();
-        while (iterator.hasNext()) {
-            Vertex vertex = iterator.next();
-            this.sendManager.sendVertex(vertex);
+        for (int i = 0; i < this.sendThreadNum; i++) {
+            future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
+            futures.add(future);
         }
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
+            throw new ComputerException("An exception occurred during parallel " +
+                                        "sending vertices", e);
+        }).join();
         this.sendManager.finishSend(MessageType.VERTEX);
 
+        futures.clear();
+
         this.sendManager.startSend(MessageType.EDGE);
-        iterator = this.loadService.createIteratorFromEdge();
-        while (iterator.hasNext()) {
-            Vertex vertex = iterator.next();
-            this.sendManager.sendEdge(vertex);
+        for (int i = 0; i < this.sendThreadNum; i++) {
+            future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
+            futures.add(future);
         }
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
+            throw new ComputerException("An exception occurred during parallel " +
+                                        "sending edges", e);
+        }).join();
         this.sendManager.finishSend(MessageType.EDGE);
         this.sendManager.clearBuffer();
     }
+
+    private CompletableFuture<?> send(Consumer<Vertex> sendConsumer,
+                                      Supplier<Iterator<Vertex>> iteratorSupplier) {
+        return CompletableFuture.runAsync(() -> {
+            Iterator<Vertex> iterator = iteratorSupplier.get();
+            while (iterator.hasNext()) {
+                Vertex vertex = iterator.next();
+                sendConsumer.accept(vertex);
+            }
+        }, this.sendExecutor);
+    }
 }
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
index 230f3eb0..237b02d5 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.computer.core.worker.load;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hugegraph.computer.core.common.ComputerContext;
 import org.apache.hugegraph.computer.core.config.ComputerOptions;
@@ -43,33 +44,43 @@ public class LoadService {
 
     private final GraphFactory graphFactory;
     private final Config config;
+
+    // Service proxy on the client
+    private InputSplitRpcService rpcService;
+    private final InputFilter inputFilter;
+
+    private final int fetcherNum;
     /*
      * GraphFetcher include:
      *   VertexFetcher vertexFetcher;
      *   EdgeFetcher edgeFetcher;
      */
-    private GraphFetcher fetcher;
-    // Service proxy on the client
-    private InputSplitRpcService rpcService;
-    private final InputFilter inputFilter;
+    private final GraphFetcher[] fetchers;
+    private final AtomicInteger fetcherIdx;
 
     public LoadService(ComputerContext context) {
         this.graphFactory = context.graphFactory();
         this.config = context.config();
-        this.fetcher = null;
         this.rpcService = null;
         this.inputFilter = context.config().createObject(
                 ComputerOptions.INPUT_FILTER_CLASS);
+        this.fetcherNum = this.config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
+        this.fetchers = new GraphFetcher[this.fetcherNum];
+        this.fetcherIdx = new AtomicInteger(0);
     }
 
     public void init() {
         assert this.rpcService != null;
-        this.fetcher = InputSourceFactory.createGraphFetcher(this.config,
-                                                             this.rpcService);
+        // provide different GraphFetcher for each sending thread
+        for (int i = 0; i < this.fetcherNum; i++) {
+            this.fetchers[i] = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
+        }
     }
 
     public void close() {
-        this.fetcher.close();
+        for (GraphFetcher fetcher : this.fetchers) {
+            fetcher.close();
+        }
     }
 
     public void rpcService(InputSplitRpcService rpcService) {
@@ -78,30 +89,34 @@ public class LoadService {
     }
 
     public Iterator<Vertex> createIteratorFromVertex() {
-        return new IteratorFromVertex();
+        int currentIdx = this.fetcherIdx.getAndIncrement() % this.fetcherNum;
+        return new IteratorFromVertex(this.fetchers[currentIdx]);
     }
 
     public Iterator<Vertex> createIteratorFromEdge() {
-        return new IteratorFromEdge();
+        int currentIdx = this.fetcherIdx.getAndIncrement() % this.fetcherNum;
+        return new IteratorFromEdge(this.fetchers[currentIdx]);
     }
 
     private class IteratorFromVertex implements Iterator<Vertex> {
 
         private InputSplit currentSplit;
+        private GraphFetcher fetcher;
 
-        public IteratorFromVertex() {
+        public IteratorFromVertex(GraphFetcher fetcher) {
             this.currentSplit = null;
+            this.fetcher = fetcher;
         }
 
         @Override
         public boolean hasNext() {
-            VertexFetcher vertexFetcher = fetcher.vertexFetcher();
+            VertexFetcher vertexFetcher = this.fetcher.vertexFetcher();
             while (this.currentSplit == null || !vertexFetcher.hasNext()) {
                 /*
                  * The first time or the current split is complete,
                  * need to fetch next input split meta
                  */
-                this.currentSplit = fetcher.nextVertexInputSplit();
+                this.currentSplit = this.fetcher.nextVertexInputSplit();
                 if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
                     return false;
                 }
@@ -116,7 +131,7 @@ public class LoadService {
                 throw new NoSuchElementException();
             }
             org.apache.hugegraph.structure.graph.Vertex hugeVertex;
-            hugeVertex = fetcher.vertexFetcher().next();
+            hugeVertex = this.fetcher.vertexFetcher().next();
             return this.convert(hugeVertex);
         }
 
@@ -145,13 +160,15 @@ public class LoadService {
         private final int maxEdges;
         private InputSplit currentSplit;
         private Vertex currentVertex;
+        private GraphFetcher fetcher;
 
-        public IteratorFromEdge() {
+        public IteratorFromEdge(GraphFetcher fetcher) {
             // this.direction = config.get(ComputerOptions.EDGE_DIRECTION);
             this.maxEdges = config.get(
                             ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
             this.currentSplit = null;
             this.currentVertex = null;
+            this.fetcher = fetcher;
         }
 
         @Override
@@ -159,13 +176,13 @@ public class LoadService {
             if (InputSplit.END_SPLIT.equals(this.currentSplit)) {
                 return this.currentVertex != null;
             }
-            EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
+            EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
             while (this.currentSplit == null || !edgeFetcher.hasNext()) {
                 /*
                  * The first time or the current split is complete,
                  * need to fetch next input split meta
                  */
-                this.currentSplit = fetcher.nextEdgeInputSplit();
+                this.currentSplit = this.fetcher.nextEdgeInputSplit();
                 if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
                     return this.currentVertex != null;
                 }
@@ -181,7 +198,7 @@ public class LoadService {
             }
 
             org.apache.hugegraph.structure.graph.Edge hugeEdge;
-            EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
+            EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
             while (edgeFetcher.hasNext()) {
                 hugeEdge = edgeFetcher.next();
                 Edge edge = this.convert(hugeEdge);