You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hugegraph.apache.org by "simon824 (via GitHub)" <gi...@apache.org> on 2023/04/06 11:29:30 UTC

[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #450: chore: improve spark parallel

simon824 commented on code in PR #450:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/450#discussion_r1159675232


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -123,35 +125,44 @@ public void load() {
         SparkContext sc = session.sparkContext();
 
         LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
+        List<Future<?>> futures = new ArrayList<>(structs.size());
+
         for (InputStruct struct : structs) {
-            LOG.info("\n Initializes the accumulator corresponding to the  {} ",
-                     struct.input().asFileSource().path());
-            LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
-            loadDistributeMetrics.init(sc);
-            LOG.info("\n  Start to load data, data info is: \t {} ",
-                     struct.input().asFileSource().path());
-            Dataset<Row> ds = read(session, struct);
-            if (sinkType) {
-                LOG.info("\n  Start to load data using spark apis  \n");
-                ds.foreachPartition((Iterator<Row> p) -> {
-                    LoadContext context = initPartition(this.loadOptions, struct);
-                    p.forEachRemaining((Row row) -> {
-                        loadRow(struct, row, p, context);
+            Future<?> future = Executors.newCachedThreadPool().submit(() -> {
+                LOG.info("\n Initializes the accumulator corresponding to the  {} ",
+                         struct.input().asFileSource().path());
+                LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
+                loadDistributeMetrics.init(sc);
+                LOG.info("\n  Start to load data, data info is: \t {} ",
+                         struct.input().asFileSource().path());
+                Dataset<Row> ds = read(session, struct);
+                if (sinkType) {
+                    LOG.info("\n  Start to load data using spark apis  \n");
+                    ds.foreachPartition((Iterator<Row> p) -> {
+                        LoadContext context = initPartition(this.loadOptions, struct);
+                        p.forEachRemaining((Row row) -> {
+                            loadRow(struct, row, p, context);
+                        });
+                        context.close();
                     });
-                    context.close();
-                });
 
-            } else {
-                LOG.info("\n Start to load data using spark bulkload \n");
-                // gen-hfile
-                HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
-                                                                       loadDistributeMetrics);
-                directLoader.bulkload(ds);
+                } else {
+                    LOG.info("\n Start to load data using spark bulkload \n");
+                    // gen-hfile
+                    HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
+                                                                           loadDistributeMetrics);

Review Comment:
   LoadDistributeMetrics 里用的是spark的累加器,能聚合executor的值到driver 
   https://github.com/apache/incubator-hugegraph-toolchain/blob/master/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadDistributeMetrics.java#L54



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org
For additional commands, e-mail: issues-help@hugegraph.apache.org