You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2023/04/06 10:37:03 UTC

[incubator-hugegraph-toolchain] branch zy_dev created (now 8f1bfe33)

This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a change to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


      at 8f1bfe33 chore: improve spark parallel

This branch includes the following new commits:

     new 8f1bfe33 chore: improve spark parallel

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-hugegraph-toolchain] 01/01: chore: improve spark parallel

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git

commit 8f1bfe33981cb476f7d6416b795b1dff20389290
Author: vaughn.zhang <va...@zoom.us>
AuthorDate: Thu Apr 6 18:30:03 2023 +0800

    chore: improve spark parallel
---
 .../loader/spark/HugeGraphSparkLoader.java         | 65 +++++++++++++---------
 1 file changed, 38 insertions(+), 27 deletions(-)

diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 60c7837f..b26003b0 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -63,6 +63,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import scala.collection.JavaConverters;
 
@@ -77,11 +80,10 @@ public class HugeGraphSparkLoader implements Serializable {
         HugeGraphSparkLoader loader;
         try {
             loader = new HugeGraphSparkLoader(args);
+            loader.load();
         } catch (Throwable e) {
             Printer.printError("Failed to start loading", e);
-            return;
         }
-        loader.load();
     }
 
     public HugeGraphSparkLoader(String[] args) {
@@ -89,7 +91,7 @@ public class HugeGraphSparkLoader implements Serializable {
         this.builders = new HashMap<>();
     }
 
-    public void load() {
+    public void load() throws ExecutionException, InterruptedException {
         LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
         List<InputStruct> structs = mapping.structs();
         boolean sinkType = this.loadOptions.sinkType;
@@ -123,35 +125,44 @@ public class HugeGraphSparkLoader implements Serializable {
         SparkContext sc = session.sparkContext();
 
         LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
+        List<Future<Void>> futures = new ArrayList<>(structs.size());
+
         for (InputStruct struct : structs) {
-            LOG.info("\n Initializes the accumulator corresponding to the  {} ",
-                     struct.input().asFileSource().path());
-            LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
-            loadDistributeMetrics.init(sc);
-            LOG.info("\n  Start to load data, data info is: \t {} ",
-                     struct.input().asFileSource().path());
-            Dataset<Row> ds = read(session, struct);
-            if (sinkType) {
-                LOG.info("\n  Start to load data using spark apis  \n");
-                ds.foreachPartition((Iterator<Row> p) -> {
-                    LoadContext context = initPartition(this.loadOptions, struct);
-                    p.forEachRemaining((Row row) -> {
-                        loadRow(struct, row, p, context);
+            Future<Void> future = Executors.newCachedThreadPool().submit(() -> {
+                LOG.info("\n Initializes the accumulator corresponding to the  {} ",
+                        struct.input().asFileSource().path());
+                LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
+                loadDistributeMetrics.init(sc);
+                LOG.info("\n  Start to load data, data info is: \t {} ",
+                        struct.input().asFileSource().path());
+                Dataset<Row> ds = read(session, struct);
+                if (sinkType) {
+                    LOG.info("\n  Start to load data using spark apis  \n");
+                    ds.foreachPartition((Iterator<Row> p) -> {
+                        LoadContext context = initPartition(this.loadOptions, struct);
+                        p.forEachRemaining((Row row) -> {
+                            loadRow(struct, row, p, context);
+                        });
+                        context.close();
                     });
-                    context.close();
-                });
 
-            } else {
-                LOG.info("\n Start to load data using spark bulkload \n");
-                // gen-hfile
-                HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
-                                                                       loadDistributeMetrics);
-                directLoader.bulkload(ds);
+                } else {
+                    LOG.info("\n Start to load data using spark bulkload \n");
+                    // gen-hfile
+                    HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
+                            loadDistributeMetrics);
+                    directLoader.bulkload(ds);
 
-            }
-            collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
-            LOG.info("\n Finished  load {}  data ", struct.input().asFileSource().path());
+                }
+                collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
+                LOG.info("\n Finished  load {}  data ", struct.input().asFileSource().path());
+            });
+            futures.add(future);
         }
+        for (Future<Void> future : futures) {
+            future.get();
+        }
+
         Long totalInsertSuccessCnt = totalInsertSuccess.value();
         LOG.info("\n ------------The data load task is complete-------------------\n" +
                  "\n insertSuccessCnt:\t {} \n ---------------------------------------------\n",