You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/06/08 11:20:10 UTC

[incubator-hugegraph-computer] branch master updated: feat(core): isolate namespace for different input data source (#252)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3c0b40 feat(core): isolate namespace for different input data source (#252)
0b3c0b40 is described below

commit 0b3c0b40721a697ac7098e59ec9005b772eaed15
Author: Aaron Wang <wa...@gmail.com>
AuthorDate: Thu Jun 8 19:20:04 2023 +0800

    feat(core): isolate namespace for different input data source (#252)
    
    * set default job namespace to empty string
    
    * use StringUtils.isEmpty
---
 .../hugegraph/computer/core/config/ComputerOptions.java    |  9 +++++++++
 .../org/apache/hugegraph/computer/core/bsp/BspBase.java    | 14 +++++++++++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
index 765f8adb..33fba83d 100644
--- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
+++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
@@ -420,6 +420,14 @@ public class ComputerOptions extends OptionHolder {
                     10000
             );
 
+    public static final ConfigOption<String> JOB_NAMESPACE =
+            new ConfigOption<>(
+                    "job.namespace",
+                    "The job namespace can seperate different data source.",
+                    null,
+                    ""
+            );
+
     public static final ConfigOption<String> JOB_ID =
             new ConfigOption<>(
                     "job.id",
@@ -921,6 +929,7 @@ public class ComputerOptions extends OptionHolder {
                     ComputerOptions.BSP_ETCD_ENDPOINTS.name(),
                     ComputerOptions.TRANSPORT_SERVER_HOST.name(),
                     ComputerOptions.TRANSPORT_SERVER_PORT.name(),
+                    ComputerOptions.JOB_NAMESPACE.name(),
                     ComputerOptions.JOB_ID.name(),
                     ComputerOptions.JOB_WORKERS_COUNT.name(),
                     ComputerOptions.RPC_SERVER_HOST_NAME,
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
index 66a3fd93..23d01f4e 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.hugegraph.computer.core.bsp;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hugegraph.computer.core.common.exception.ComputerException;
 import org.apache.hugegraph.computer.core.config.ComputerOptions;
 import org.apache.hugegraph.computer.core.config.Config;
@@ -30,6 +31,7 @@ public abstract class BspBase {
     private final Config config;
 
     private final String jobId;
+    private final String jobNamespace;
     private final int workerCount;
     private final long registerTimeout;
     private final long barrierOnMasterTimeout;
@@ -42,6 +44,7 @@ public abstract class BspBase {
         this.config = config;
 
         this.jobId = config.get(ComputerOptions.JOB_ID);
+        this.jobNamespace = config.get(ComputerOptions.JOB_NAMESPACE);
         this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT);
         this.registerTimeout = this.config.get(
              ComputerOptions.BSP_REGISTER_TIMEOUT);
@@ -59,7 +62,10 @@ public abstract class BspBase {
      */
     private BspClient init() {
         BspClient bspClient = this.createBspClient();
-        bspClient.init(this.jobId);
+        String namespace = StringUtils.isEmpty(this.jobNamespace) ?
+                           this.constructPath(null, this.jobId) :
+                           this.constructPath(null, this.jobNamespace, this.jobId);
+        bspClient.init(namespace);
         LOG.info("Init {} BSP connection to '{}' for job '{}'",
                  bspClient.type(), bspClient.endpoint(), this.jobId);
         return bspClient;
@@ -123,8 +129,10 @@ public abstract class BspBase {
      */
     protected String constructPath(BspEvent event, Object... paths) {
         StringBuilder sb = new StringBuilder();
-        // TODO: replace event.code() with event.name()
-        sb.append(event.name());
+        if (event != null) {
+            // TODO: replace event.code() with event.name()
+            sb.append(event.name());
+        }
         for (Object path : paths) {
             sb.append("/").append(path.toString());
         }