You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/05/29 14:31:34 UTC
[incubator-hugegraph-computer] branch master updated: feat: implement parallel send data in load graph step (#248)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new 7df3b430 feat: implement parallel send data in load graph step (#248)
7df3b430 is described below
commit 7df3b430387d7a4ae556969995d3c660855eac40
Author: Aaron Wang <wa...@gmail.com>
AuthorDate: Mon May 29 22:31:28 2023 +0800
feat: implement parallel send data in load graph step (#248)
* fix: Filesystem closed caused by cache only one HDFS client
* improve: reuse fetcher instance for vertex and edge sending
---------
Co-authored-by: imbajin <ji...@apache.org>
---
.../computer/core/config/ComputerOptions.java | 8 +++
.../computer/core/compute/ComputeManager.java | 2 +-
.../computer/core/input/WorkerInputManager.java | 69 ++++++++++++++++++----
.../computer/core/worker/load/LoadService.java | 53 +++++++++++------
4 files changed, 103 insertions(+), 29 deletions(-)
diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
index f28f0208..765f8adb 100644
--- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
+++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
@@ -198,6 +198,14 @@ public class ComputerOptions extends OptionHolder {
""
);
+ public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
+ new ConfigOption<>(
+ "input.send_thread_nums",
+ "The number of threads for parallel sending vertex or edge.",
+ positiveInt(),
+ 4
+ );
+
public static final ConfigOption<Integer> SORT_THREAD_NUMS =
new ConfigOption<>(
"sort.thread_nums",
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
index 685f47b9..4898abe6 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
@@ -64,7 +64,7 @@ public class ComputeManager {
int computeThreadNum = this.partitionComputeThreadNum(context.config());
this.computeExecutor = ExecutorUtil.newFixedThreadPool(
computeThreadNum, PREFIX);
- LOG.info("Created partition compute thread poll, thread num: {}",
+ LOG.info("Created partition compute thread pool, thread num: {}",
computeThreadNum);
}
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
index 34a1352d..c5e4a766 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
@@ -17,9 +17,17 @@
package org.apache.hugegraph.computer.core.input;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.hugegraph.computer.core.common.ComputerContext;
+import org.apache.hugegraph.computer.core.common.exception.ComputerException;
+import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.manager.Manager;
@@ -27,9 +35,15 @@ import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
+import org.apache.hugegraph.util.ExecutorUtil;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
public class WorkerInputManager implements Manager {
+ private static final Logger LOG = Log.logger(WorkerInputManager.class);
+ private static final String PREFIX = "input-send-executor-%s";
+
public static final String NAME = "worker_input";
/*
@@ -37,6 +51,10 @@ public class WorkerInputManager implements Manager {
* to computer-vertices and computer-edges
*/
private final LoadService loadService;
+
+ private final ExecutorService sendExecutor;
+ private final int sendThreadNum;
+
/*
* Send vertex/edge or message to target worker
*/
@@ -44,8 +62,18 @@ public class WorkerInputManager implements Manager {
public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
- this.loadService = new LoadService(context);
this.sendManager = sendManager;
+
+ this.sendThreadNum = this.inputSendThreadNum(context.config());
+ this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
+ LOG.info("Created parallel sending thread pool, thread num: {}",
+ sendThreadNum);
+
+ this.loadService = new LoadService(context);
+ }
+
+ private Integer inputSendThreadNum(Config config) {
+ return config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
}
@Override
@@ -63,6 +91,7 @@ public class WorkerInputManager implements Manager {
public void close(Config config) {
this.loadService.close();
this.sendManager.close(config);
+ this.sendExecutor.shutdown();
}
public void service(InputSplitRpcService rpcService) {
@@ -70,26 +99,46 @@ public class WorkerInputManager implements Manager {
}
/**
- * TODO: Load vertices and edges parallel.
* When this method finish, it means that all vertices and edges are sent,
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
- Iterator<Vertex> iterator = this.loadService.createIteratorFromVertex();
- while (iterator.hasNext()) {
- Vertex vertex = iterator.next();
- this.sendManager.sendVertex(vertex);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
+ futures.add(future);
}
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
+ throw new ComputerException("An exception occurred during parallel " +
+ "sending vertices", e);
+ }).join();
this.sendManager.finishSend(MessageType.VERTEX);
+ futures.clear();
+
this.sendManager.startSend(MessageType.EDGE);
- iterator = this.loadService.createIteratorFromEdge();
- while (iterator.hasNext()) {
- Vertex vertex = iterator.next();
- this.sendManager.sendEdge(vertex);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
+ futures.add(future);
}
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
+ throw new ComputerException("An exception occurred during parallel " +
+ "sending edges", e);
+ }).join();
this.sendManager.finishSend(MessageType.EDGE);
this.sendManager.clearBuffer();
}
+
+ private CompletableFuture<?> send(Consumer<Vertex> sendConsumer,
+ Supplier<Iterator<Vertex>> iteratorSupplier) {
+ return CompletableFuture.runAsync(() -> {
+ Iterator<Vertex> iterator = iteratorSupplier.get();
+ while (iterator.hasNext()) {
+ Vertex vertex = iterator.next();
+ sendConsumer.accept(vertex);
+ }
+ }, this.sendExecutor);
+ }
}
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
index 230f3eb0..237b02d5 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.computer.core.worker.load;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
@@ -43,33 +44,43 @@ public class LoadService {
private final GraphFactory graphFactory;
private final Config config;
+
+ // Service proxy on the client
+ private InputSplitRpcService rpcService;
+ private final InputFilter inputFilter;
+
+ private final int fetcherNum;
/*
* GraphFetcher include:
* VertexFetcher vertexFetcher;
* EdgeFetcher edgeFetcher;
*/
- private GraphFetcher fetcher;
- // Service proxy on the client
- private InputSplitRpcService rpcService;
- private final InputFilter inputFilter;
+ private final GraphFetcher[] fetchers;
+ private final AtomicInteger fetcherIdx;
public LoadService(ComputerContext context) {
this.graphFactory = context.graphFactory();
this.config = context.config();
- this.fetcher = null;
this.rpcService = null;
this.inputFilter = context.config().createObject(
ComputerOptions.INPUT_FILTER_CLASS);
+ this.fetcherNum = this.config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
+ this.fetchers = new GraphFetcher[this.fetcherNum];
+ this.fetcherIdx = new AtomicInteger(0);
}
public void init() {
assert this.rpcService != null;
- this.fetcher = InputSourceFactory.createGraphFetcher(this.config,
- this.rpcService);
+ // provide different GraphFetcher for each sending thread
+ for (int i = 0; i < this.fetcherNum; i++) {
+ this.fetchers[i] = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
+ }
}
public void close() {
- this.fetcher.close();
+ for (GraphFetcher fetcher : this.fetchers) {
+ fetcher.close();
+ }
}
public void rpcService(InputSplitRpcService rpcService) {
@@ -78,30 +89,34 @@ public class LoadService {
}
public Iterator<Vertex> createIteratorFromVertex() {
- return new IteratorFromVertex();
+ int currentIdx = this.fetcherIdx.getAndIncrement() % this.fetcherNum;
+ return new IteratorFromVertex(this.fetchers[currentIdx]);
}
public Iterator<Vertex> createIteratorFromEdge() {
- return new IteratorFromEdge();
+ int currentIdx = this.fetcherIdx.getAndIncrement() % this.fetcherNum;
+ return new IteratorFromEdge(this.fetchers[currentIdx]);
}
private class IteratorFromVertex implements Iterator<Vertex> {
private InputSplit currentSplit;
+ private GraphFetcher fetcher;
- public IteratorFromVertex() {
+ public IteratorFromVertex(GraphFetcher fetcher) {
this.currentSplit = null;
+ this.fetcher = fetcher;
}
@Override
public boolean hasNext() {
- VertexFetcher vertexFetcher = fetcher.vertexFetcher();
+ VertexFetcher vertexFetcher = this.fetcher.vertexFetcher();
while (this.currentSplit == null || !vertexFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
*/
- this.currentSplit = fetcher.nextVertexInputSplit();
+ this.currentSplit = this.fetcher.nextVertexInputSplit();
if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
return false;
}
@@ -116,7 +131,7 @@ public class LoadService {
throw new NoSuchElementException();
}
org.apache.hugegraph.structure.graph.Vertex hugeVertex;
- hugeVertex = fetcher.vertexFetcher().next();
+ hugeVertex = this.fetcher.vertexFetcher().next();
return this.convert(hugeVertex);
}
@@ -145,13 +160,15 @@ public class LoadService {
private final int maxEdges;
private InputSplit currentSplit;
private Vertex currentVertex;
+ private GraphFetcher fetcher;
- public IteratorFromEdge() {
+ public IteratorFromEdge(GraphFetcher fetcher) {
// this.direction = config.get(ComputerOptions.EDGE_DIRECTION);
this.maxEdges = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
this.currentSplit = null;
this.currentVertex = null;
+ this.fetcher = fetcher;
}
@Override
@@ -159,13 +176,13 @@ public class LoadService {
if (InputSplit.END_SPLIT.equals(this.currentSplit)) {
return this.currentVertex != null;
}
- EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
+ EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
while (this.currentSplit == null || !edgeFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
*/
- this.currentSplit = fetcher.nextEdgeInputSplit();
+ this.currentSplit = this.fetcher.nextEdgeInputSplit();
if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
return this.currentVertex != null;
}
@@ -181,7 +198,7 @@ public class LoadService {
}
org.apache.hugegraph.structure.graph.Edge hugeEdge;
- EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
+ EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
while (edgeFetcher.hasNext()) {
hugeEdge = edgeFetcher.next();
Edge edge = this.convert(hugeEdge);