You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by zh...@apache.org on 2022/11/23 06:53:21 UTC
[incubator-hugegraph-computer] 01/01: refactor computer
This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch refactor_computer
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
commit 036bdff1c4d8a2aa0d1e47d59f0b2a44a352b6cd
Author: coderzc <zh...@apache.org>
AuthorDate: Wed Nov 23 14:52:40 2022 +0800
refactor computer
---
computer-algorithm/pom.xml | 2 +-
.../betweenness/BetweennessCentralityParams.java | 6 +-
.../closeness/ClosenessCentralityParams.java | 6 +-
.../centrality/closeness/ClosenessMessage.java | 4 +-
.../centrality/degree/DegreeCentralityParams.java | 3 +-
.../centrality/pagerank/PageRankParams.java | 3 +-
.../algorithm/community/kcore/KcoreParams.java | 6 +-
.../algorithm/community/kcore/KcoreValue.java | 6 +-
.../algorithm/community/lpa/LpaParams.java | 8 +-
.../trianglecount/TriangleCountParams.java | 3 +-
.../algorithm/community/wcc/WccParams.java | 8 +-
.../filter/RingsDetectionWithFilterParams.java | 3 +-
computer-api/pom.xml | 9 +
.../computer/algorithm/AlgorithmParams.java | 26 +-
.../computer/core/config/ComputerOptions.java | 43 +-
.../computer/core/config/DefaultConfig.java | 4 +-
.../computer/core/config/EdgeFrequency.java | 0
.../hugegraph/computer/core/config/HotConfig.java | 0
.../computer/core/graph/id/IdFactory.java | 92 +++
.../core/graph/properties/DefaultProperties.java | 0
.../hugegraph/computer/core/graph/value/IdSet.java | 0
.../computer/core/graph/value/MapValue.java | 3 +-
computer-core/pom.xml | 2 +-
.../computer/core/graph/id/IdFactory.java | 49 --
.../computer/core/network/TransportConf.java | 5 +-
.../core/output/AbstractComputerOutput.java | 55 --
.../output/hg/exceptions/WriteBackException.java | 41 --
.../core/output/hg/metrics/LoadMetrics.java | 49 --
.../core/output/hg/metrics/LoadReport.java | 42 --
.../core/output/hg/metrics/LoadSummary.java | 73 ---
.../computer/core/output/hg/metrics/Printer.java | 59 --
.../core/output/hg/task/BatchInsertTask.java | 104 ---
.../computer/core/output/hg/task/InsertTask.java | 80 ---
.../core/output/hg/task/SingleInsertTask.java | 62 --
.../computer/core/output/hg/task/TaskManager.java | 190 ------
computer-dist/src/assembly/travis/build-images.sh | 2 +-
computer-driver/pom.xml | 5 +-
.../computer/driver/config/ComputerOptions.java | 721 ---------------------
.../operator/controller/ComputerJobDeployer.java | 10 +-
.../computer/k8s/config/KubeDriverOptions.java | 4 +-
.../computer/k8s/driver/KubernetesDriver.java | 37 +-
.../computer/driver/ComputerOptionsTest.java | 10 +-
.../hugegraph/computer/driver/DriverTestSuite.java | 10 -
.../hugegraph/computer/k8s/AbstractK8sTest.java | 12 +-
.../computer/k8s/KubernetesDriverTest.java | 8 +-
.../baidu/hugegraph/computer/k8s/MiniKubeTest.java | 15 +-
.../computer/suite/unit/UnitTestBase.java | 3 +-
pom.xml | 5 +
48 files changed, 248 insertions(+), 1640 deletions(-)
diff --git a/computer-algorithm/pom.xml b/computer-algorithm/pom.xml
index 6cf9b137..783656ef 100644
--- a/computer-algorithm/pom.xml
+++ b/computer-algorithm/pom.xml
@@ -31,7 +31,7 @@
<!-- TODO improve structure to only import computer-api -->
<dependency>
<groupId>com.baidu.hugegraph</groupId>
- <artifactId>computer-core</artifactId>
+ <artifactId>computer-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/betweenness/BetweennessCentralityParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/betweenness/BetweennessCentralityParams.java
index f768b1b8..87a26bed 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/betweenness/BetweennessCentralityParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/betweenness/BetweennessCentralityParams.java
@@ -24,9 +24,7 @@ import java.util.Map;
import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.algorithm.centrality.closeness.ClosenessCentrality;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.input.filter.ExtractAllPropertyInputFilter;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphDoubleOutput;
public class BetweennessCentralityParams implements AlgorithmParams {
@@ -41,9 +39,9 @@ public class BetweennessCentralityParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
BetweennessMessage.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphDoubleOutput.class.getName());
+ HUGEGRAPH_DOUBLE_OUTPUT_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.INPUT_FILTER_CLASS,
- ExtractAllPropertyInputFilter.class.getName());
+ EXTRACTALLPROPERTYINPUTFILTER_CLASS_NAME);
this.setIfAbsent(params, ClosenessCentrality.OPTION_SAMPLE_RATE,
"0.5D");
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessCentralityParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessCentralityParams.java
index 869ac4b7..9fbf58f9 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessCentralityParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessCentralityParams.java
@@ -23,9 +23,7 @@ import java.util.Map;
import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.input.filter.ExtractAllPropertyInputFilter;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphDoubleOutput;
public class ClosenessCentralityParams implements AlgorithmParams {
@@ -40,9 +38,9 @@ public class ClosenessCentralityParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
ClosenessMessage.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphDoubleOutput.class.getName());
+ HUGEGRAPH_DOUBLE_OUTPUT_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.INPUT_FILTER_CLASS,
- ExtractAllPropertyInputFilter.class.getName());
+ EXTRACTALLPROPERTYINPUTFILTER_CLASS_NAME);
this.setIfAbsent(params, ClosenessCentrality.OPTION_SAMPLE_RATE,
"0.5D");
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessMessage.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessMessage.java
index 103a2b09..7251640e 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessMessage.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/closeness/ClosenessMessage.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
-import com.baidu.hugegraph.computer.core.graph.id.BytesId;
import com.baidu.hugegraph.computer.core.graph.id.Id;
+import com.baidu.hugegraph.computer.core.graph.id.IdFactory;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.value.Value.CustomizeValue;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
@@ -40,7 +40,7 @@ public class ClosenessMessage implements CustomizeValue<ClosenessMessage> {
private DoubleValue distance;
public ClosenessMessage() {
- this(new BytesId(), new BytesId(), new DoubleValue(0.0D));
+ this(IdFactory.createId(), IdFactory.createId(), new DoubleValue(0.0D));
}
public ClosenessMessage(Id senderId, Id startId, DoubleValue distance) {
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/degree/DegreeCentralityParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/degree/DegreeCentralityParams.java
index 0935f846..04fc08ef 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/degree/DegreeCentralityParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/degree/DegreeCentralityParams.java
@@ -26,7 +26,6 @@ import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphDoubleOutput;
public class DegreeCentralityParams implements AlgorithmParams {
@@ -43,6 +42,6 @@ public class DegreeCentralityParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
DoubleValueSumCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphDoubleOutput.class.getName());
+ HUGEGRAPH_DOUBLE_OUTPUT_CLASS_NAME);
}
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/pagerank/PageRankParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/pagerank/PageRankParams.java
index 18c668ab..a10018c7 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/pagerank/PageRankParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/centrality/pagerank/PageRankParams.java
@@ -25,7 +25,6 @@ import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphDoubleOutput;
public class PageRankParams implements AlgorithmParams {
@@ -42,6 +41,6 @@ public class PageRankParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
DoubleValueSumCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphDoubleOutput.class.getName());
+ HUGEGRAPH_DOUBLE_OUTPUT_CLASS_NAME);
}
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreParams.java
index 766736d1..bdd4b155 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreParams.java
@@ -24,8 +24,6 @@ import java.util.Map;
import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.combiner.ValueMinCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.graph.id.BytesId;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphIdOutput;
public class KcoreParams implements AlgorithmParams {
@@ -36,10 +34,10 @@ public class KcoreParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
KcoreValue.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- BytesId.class.getName());
+ BYTESID_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
ValueMinCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphIdOutput.class.getName());
+ HUGEGRAPH_ID_OUTPUT_CLASS_NAME);
}
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreValue.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreValue.java
index 45e86fef..04b51626 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreValue.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/kcore/KcoreValue.java
@@ -22,13 +22,13 @@ package com.baidu.hugegraph.computer.algorithm.community.kcore;
import java.io.IOException;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hugegraph.util.E;
-import com.baidu.hugegraph.computer.core.graph.id.BytesId;
import com.baidu.hugegraph.computer.core.graph.id.Id;
+import com.baidu.hugegraph.computer.core.graph.id.IdFactory;
import com.baidu.hugegraph.computer.core.graph.value.Value.CustomizeValue;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
-import org.apache.hugegraph.util.E;
public class KcoreValue implements CustomizeValue<Object> {
@@ -37,7 +37,7 @@ public class KcoreValue implements CustomizeValue<Object> {
public KcoreValue() {
this.degree = 0;
- this.core = new BytesId();
+ this.core = IdFactory.createId();
}
public void degree(int degree) {
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/lpa/LpaParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/lpa/LpaParams.java
index 12b3710f..9839f2b2 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/lpa/LpaParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/lpa/LpaParams.java
@@ -23,8 +23,6 @@ import java.util.Map;
import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.graph.id.BytesId;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphIdOutput;
public class LpaParams implements AlgorithmParams {
@@ -33,10 +31,10 @@ public class LpaParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS,
Lpa.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
- BytesId.class.getName());
+ BYTESID_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- BytesId.class.getName());
+ BYTESID_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphIdOutput.class.getName());
+ HUGEGRAPH_ID_OUTPUT_CLASS_NAME);
}
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/trianglecount/TriangleCountParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/trianglecount/TriangleCountParams.java
index 8cdf5f3e..799da979 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/trianglecount/TriangleCountParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/trianglecount/TriangleCountParams.java
@@ -25,7 +25,6 @@ import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.EdgeFrequency;
import com.baidu.hugegraph.computer.core.graph.value.IdList;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphIntOutput;
public class TriangleCountParams implements AlgorithmParams {
@@ -38,7 +37,7 @@ public class TriangleCountParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
TriangleCountValue.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphIntOutput.class.getName());
+ HUGEGRAPH_INT_OUTPUT_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.INPUT_EDGE_FREQ.name(),
EdgeFrequency.SINGLE.name());
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/wcc/WccParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/wcc/WccParams.java
index bc1ca168..192169bd 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/wcc/WccParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/community/wcc/WccParams.java
@@ -24,8 +24,6 @@ import java.util.Map;
import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.combiner.ValueMinCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.graph.id.BytesId;
-import com.baidu.hugegraph.computer.core.output.hg.HugeGraphIdOutput;
public class WccParams implements AlgorithmParams {
@@ -34,12 +32,12 @@ public class WccParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS,
Wcc.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
- BytesId.class.getName());
+ BYTESID_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- BytesId.class.getName());
+ BYTESID_CLASS_NAME);
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
ValueMinCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
- HugeGraphIdOutput.class.getName());
+ HUGEGRAPH_ID_OUTPUT_CLASS_NAME);
}
}
diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/path/rings/filter/RingsDetectionWithFilterParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/path/rings/filter/RingsDetectionWithFilterParams.java
index f52cb525..35571243 100644
--- a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/path/rings/filter/RingsDetectionWithFilterParams.java
+++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/path/rings/filter/RingsDetectionWithFilterParams.java
@@ -25,7 +25,6 @@ import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.algorithm.path.rings.RingsDetectionOutput;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.IdListList;
-import com.baidu.hugegraph.computer.core.input.filter.ExtractAllPropertyInputFilter;
public class RingsDetectionWithFilterParams implements AlgorithmParams {
@@ -40,6 +39,6 @@ public class RingsDetectionWithFilterParams implements AlgorithmParams {
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
RingsDetectionOutput.class.getName());
this.setIfAbsent(params, ComputerOptions.INPUT_FILTER_CLASS,
- ExtractAllPropertyInputFilter.class.getName());
+ EXTRACTALLPROPERTYINPUTFILTER_CLASS_NAME);
}
}
diff --git a/computer-api/pom.xml b/computer-api/pom.xml
index 8bf362d1..d55e679e 100644
--- a/computer-api/pom.xml
+++ b/computer-api/pom.xml
@@ -28,6 +28,15 @@
<artifactId>computer-api</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hugegraph</groupId>
+ <artifactId>hugegraph-rpc</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java
index 7d531409..782cccaf 100644
--- a/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java
+++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java
@@ -21,15 +21,37 @@ package com.baidu.hugegraph.computer.algorithm;
import java.util.Map;
-import org.slf4j.Logger;
-
import org.apache.hugegraph.config.ConfigOption;
import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
public interface AlgorithmParams {
Logger LOG = Log.logger(AlgorithmParams.class);
+ String BYTESID_CLASS_NAME = "com.baidu.hugegraph.computer.core.graph.id.BytesId";
+ String HUGEGRAPH_ID_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphIdOutput";
+ String LOG_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core.output.LogOutput";
+ String HUGEGRAPH_DOUBLE_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphDoubleOutput";
+ String HUGEGRAPH_FLOAT_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphFloatOutput";
+ String HUGEGRAPH_INT_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphIntOutput";
+ String HUGEGRAPH_LONG_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphLongOutput";
+ String HUGEGRAPH_STRING_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphStringOutput";
+ String HUGEGRAPH_LIST_OUTPUT_CLASS_NAME = "com.baidu.hugegraph.computer.core" +
+ ".output.hg.HugeGraphListOutput";
+ String DEFAULTINPUTFILTER_CLASS_NAME = "com.baidu.hugegraph.computer" +
+ ".core.input.filter" +
+ ".DefaultInputFilter";
+ String EXTRACTALLPROPERTYINPUTFILTER_CLASS_NAME = "com.baidu.hugegraph.computer" +
+ ".core.input.filter" +
+ ".ExtractAllPropertyInputFilter";
+
/**
* set algorithm's specific configuration
* @param params
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java
similarity index 97%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java
index 97d5e21c..63c4563e 100644
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java
+++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java
@@ -19,30 +19,32 @@
package com.baidu.hugegraph.computer.core.config;
+import static com.baidu.hugegraph.computer.algorithm.AlgorithmParams.DEFAULTINPUTFILTER_CLASS_NAME;
+import static com.baidu.hugegraph.computer.algorithm.AlgorithmParams.LOG_OUTPUT_CLASS_NAME;
import static org.apache.hugegraph.config.OptionChecker.allowValues;
import static org.apache.hugegraph.config.OptionChecker.disallowEmpty;
import static org.apache.hugegraph.config.OptionChecker.nonNegativeInt;
import static org.apache.hugegraph.config.OptionChecker.positiveInt;
-import com.baidu.hugegraph.computer.core.combiner.OverwritePropertiesCombiner;
-import com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner;
-import com.baidu.hugegraph.computer.core.input.filter.DefaultInputFilter;
-import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
-import com.baidu.hugegraph.computer.core.network.TransportConf;
-import com.baidu.hugegraph.computer.core.network.netty.NettyTransportProvider;
-import com.baidu.hugegraph.computer.core.output.LogOutput;
-import org.apache.hugegraph.structure.constant.Direction;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
import org.apache.hugegraph.config.ConfigConvOption;
import org.apache.hugegraph.config.ConfigListOption;
import org.apache.hugegraph.config.ConfigOption;
import org.apache.hugegraph.config.OptionHolder;
+import org.apache.hugegraph.structure.constant.Direction;
import org.apache.hugegraph.util.Bytes;
+import com.baidu.hugegraph.computer.core.combiner.OverwritePropertiesCombiner;
+import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
public class ComputerOptions extends OptionHolder {
+ public static final int TRANSPORT_DEFAULT_THREADS = 4;
+
private ComputerOptions() {
super();
}
@@ -131,7 +133,7 @@ public class ComputerOptions extends OptionHolder {
"input-filter is used to Filter vertex edges " +
"according to user needs.",
disallowEmpty(),
- DefaultInputFilter.class
+ loadClass(DEFAULTINPUTFILTER_CLASS_NAME)
);
public static final ConfigConvOption<String, Direction>
@@ -205,7 +207,7 @@ public class ComputerOptions extends OptionHolder {
"The class to output the computation result of each " +
"vertex. Be called after iteration computation.",
disallowEmpty(),
- LogOutput.class
+ loadClass(LOG_OUTPUT_CLASS_NAME)
);
public static final ConfigOption<String> OUTPUT_RESULT_NAME =
@@ -494,7 +496,7 @@ public class ComputerOptions extends OptionHolder {
"The partitioner that decides which partition a vertex " +
"should be in, and which worker a partition should be in.",
disallowEmpty(),
- HashPartitioner.class
+ loadClass("com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner")
);
public static final ConfigOption<Class<?>> WORKER_COMPUTATION_CLASS =
@@ -644,7 +646,7 @@ public class ComputerOptions extends OptionHolder {
"transport.server_threads",
"The number of transport threads for server.",
positiveInt(),
- TransportConf.DEFAULT_THREADS
+ TRANSPORT_DEFAULT_THREADS
);
public static final ConfigOption<Integer> TRANSPORT_CLIENT_THREADS =
@@ -652,7 +654,7 @@ public class ComputerOptions extends OptionHolder {
"transport.client_threads",
"The number of transport threads for client.",
positiveInt(),
- TransportConf.DEFAULT_THREADS
+ TRANSPORT_DEFAULT_THREADS
);
public static final ConfigOption<Class<?>> TRANSPORT_PROVIDER_CLASS =
@@ -660,7 +662,8 @@ public class ComputerOptions extends OptionHolder {
"transport.provider_class",
"The transport provider, currently only supports Netty.",
disallowEmpty(),
- NettyTransportProvider.class
+ loadClass("com.baidu.hugegraph.computer.core.network.netty" +
+ ".NettyTransportProvider")
);
public static final ConfigOption<String> TRANSPORT_IO_MODE =
@@ -899,4 +902,12 @@ public class ComputerOptions extends OptionHolder {
public static Set<String> REQUIRED_OPTIONS = ImmutableSet.of(
);
+
+ public static Class<?> loadClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ return Null.class;
+ }
+ }
}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java
similarity index 100%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java
index ddea27d0..b74b02cb 100644
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java
+++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfig.java
@@ -23,13 +23,13 @@ import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration2.MapConfiguration;
-
-import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.config.ConfigOption;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.util.E;
+import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
+
public final class DefaultConfig implements Config {
private final HugeConfig allConfig;
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/EdgeFrequency.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/EdgeFrequency.java
similarity index 100%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/EdgeFrequency.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/EdgeFrequency.java
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/HotConfig.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/HotConfig.java
similarity index 100%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/HotConfig.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/config/HotConfig.java
diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java
new file mode 100644
index 00000000..e305e839
--- /dev/null
+++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.computer.core.graph.id;
+
+import static com.baidu.hugegraph.computer.algorithm.AlgorithmParams.BYTESID_CLASS_NAME;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.UUID;
+
+import com.baidu.hugegraph.computer.core.common.Constants;
+import com.baidu.hugegraph.computer.core.common.SerialEnum;
+import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
+
+public final class IdFactory {
+
+ private static final Constructor<?> BYTES_ID_CONSTRUCTOR;
+ private static final Method BYTES_ID_LONG_METHOD;
+ private static final Method BYTES_ID_STRING_METHOD;
+ private static final Method BYTES_ID_UUID_METHOD;
+
+ private static final UUID EMPTY_UUID = new UUID(0L, 0L);
+
+ static {
+ try {
+ Class<?> bytesIdClass = Class.forName(BYTESID_CLASS_NAME);
+
+ BYTES_ID_CONSTRUCTOR = bytesIdClass.getDeclaredConstructor();
+ BYTES_ID_CONSTRUCTOR.setAccessible(true);
+
+ BYTES_ID_LONG_METHOD = bytesIdClass.getMethod("of", long.class);
+ BYTES_ID_LONG_METHOD.setAccessible(false);
+
+ BYTES_ID_STRING_METHOD = bytesIdClass.getMethod("of", String.class);
+ BYTES_ID_STRING_METHOD.setAccessible(false);
+
+ BYTES_ID_UUID_METHOD = bytesIdClass.getMethod("of", UUID.class);
+ BYTES_ID_UUID_METHOD.setAccessible(false);
+ } catch (Throwable e) {
+ throw new ComputerException("Failed to reflection BytesId method", e);
+ }
+ }
+
+ // Maybe can reuse Id
+ public static Id createId(byte code) {
+ IdType type = SerialEnum.fromCode(IdType.class, code);
+ return createId(type);
+ }
+
+ public static Id createId(IdType type) {
+ try {
+ switch (type) {
+ case LONG:
+ return (Id) BYTES_ID_LONG_METHOD.invoke(null, 0L);
+ case UTF8:
+ return (Id) BYTES_ID_STRING_METHOD.invoke(null, Constants.EMPTY_STR);
+ case UUID:
+ return (Id) BYTES_ID_UUID_METHOD.invoke(null, EMPTY_UUID);
+ default:
+ throw new ComputerException("Can't create Id for %s",
+ type.name());
+ }
+ } catch (Exception e) {
+ throw new ComputerException("Failed to createId", e);
+ }
+ }
+
+ public static Id createId() {
+ try {
+ return (Id) BYTES_ID_CONSTRUCTOR.newInstance();
+ } catch (Exception e) {
+ throw new ComputerException("Can't create Id for %s");
+ }
+ }
+}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/properties/DefaultProperties.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/properties/DefaultProperties.java
similarity index 100%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/properties/DefaultProperties.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/properties/DefaultProperties.java
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdSet.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdSet.java
similarity index 100%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdSet.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdSet.java
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java
similarity index 99%
rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java
rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java
index cf11f49e..d87f02f5 100644
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java
+++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/MapValue.java
@@ -24,13 +24,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.hugegraph.util.E;
+
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.SerialEnum;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
-import org.apache.hugegraph.util.E;
public class MapValue<T extends Value> implements Value {
diff --git a/computer-core/pom.xml b/computer-core/pom.xml
index a14dd02a..1ce82d34 100644
--- a/computer-core/pom.xml
+++ b/computer-core/pom.xml
@@ -54,7 +54,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.9</version>
+ <version>${commons-lang3-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java
deleted file mode 100644
index 1767a670..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/id/IdFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.graph.id;
-
-import java.util.UUID;
-
-import com.baidu.hugegraph.computer.core.common.Constants;
-import com.baidu.hugegraph.computer.core.common.SerialEnum;
-import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
-
-public final class IdFactory {
-
- // Maybe can reuse Id
- public static Id createId(byte code) {
- IdType type = SerialEnum.fromCode(IdType.class, code);
- return createId(type);
- }
-
- public static Id createId(IdType type) {
- switch (type) {
- case LONG:
- return BytesId.of(0L);
- case UTF8:
- return BytesId.of(Constants.EMPTY_STR);
- case UUID:
- return BytesId.of(new UUID(0L, 0L));
- default:
- throw new ComputerException("Can't create Id for %s",
- type.name());
- }
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/network/TransportConf.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/network/TransportConf.java
index 120b9b00..0968467d 100644
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/network/TransportConf.java
+++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/network/TransportConf.java
@@ -22,10 +22,11 @@ package com.baidu.hugegraph.computer.core.network;
import java.net.InetAddress;
import java.util.Locale;
+import org.apache.hugegraph.util.E;
+
import com.baidu.hugegraph.computer.core.common.exception.IllegalArgException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
-import org.apache.hugegraph.util.E;
import io.netty.channel.epoll.Epoll;
@@ -37,8 +38,6 @@ public class TransportConf {
"transport-netty-client";
private final Config config;
- public static final int DEFAULT_THREADS = 4;
-
public static TransportConf wrapConfig(Config config) {
return new TransportConf(config);
}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/AbstractComputerOutput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/AbstractComputerOutput.java
deleted file mode 100644
index 3d4dd100..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/AbstractComputerOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output;
-
-import org.slf4j.Logger;
-
-import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.config.Config;
-import com.baidu.hugegraph.computer.core.worker.Computation;
-import org.apache.hugegraph.util.Log;
-
-public abstract class AbstractComputerOutput implements ComputerOutput {
-
- private static final Logger LOG = Log.logger(ComputerOutput.class);
-
- private String name;
- private int partition;
-
- @Override
- public void init(Config config, int partition) {
- Computation<?> computation = config.createObject(
- ComputerOptions.WORKER_COMPUTATION_CLASS);
- this.name = computation.name();
- this.partition = partition;
-
- LOG.info("Start write back partition {} for {}",
- this.partition(), this.name());
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- public int partition() {
- return this.partition;
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/exceptions/WriteBackException.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/exceptions/WriteBackException.java
deleted file mode 100644
index 227f6e3a..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/exceptions/WriteBackException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.exceptions;
-
-public class WriteBackException extends RuntimeException {
-
- private static final long serialVersionUID = 5504623124963497613L;
-
- public WriteBackException(String message) {
- super(message);
- }
-
- public WriteBackException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public WriteBackException(String message, Object... args) {
- super(String.format(message, args));
- }
-
- public WriteBackException(String message, Throwable cause, Object... args) {
- super(String.format(message, args), cause);
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadMetrics.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadMetrics.java
deleted file mode 100644
index 00b78ccd..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadMetrics.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.metrics;
-
-import java.util.concurrent.atomic.LongAdder;
-
-public final class LoadMetrics {
-
- private final LongAdder insertSuccess;
- private final LongAdder insertFailure;
-
- public LoadMetrics() {
- this.insertSuccess = new LongAdder();
- this.insertFailure = new LongAdder();
- }
-
- public long insertSuccess() {
- return this.insertSuccess.longValue();
- }
-
- public void plusInsertSuccess(long count) {
- this.insertSuccess.add(count);
- }
-
- public long insertFailure() {
- return this.insertFailure.longValue();
- }
-
- public void increaseInsertFailure() {
- this.insertFailure.increment();
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadReport.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadReport.java
deleted file mode 100644
index 4a3c4dba..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadReport.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.metrics;
-
-public final class LoadReport {
-
- private long vertexInsertSuccess;
- private long vertexInsertFailure;
-
- public long vertexInsertSuccess() {
- return this.vertexInsertSuccess;
- }
-
- public long vertexInsertFailure() {
- return this.vertexInsertFailure;
- }
-
- public static LoadReport collect(LoadSummary summary) {
- LoadReport report = new LoadReport();
- LoadMetrics metrics = summary.metrics();
- report.vertexInsertSuccess += metrics.insertSuccess();
- report.vertexInsertFailure += metrics.insertFailure();
- return report;
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadSummary.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadSummary.java
deleted file mode 100644
index 936beaa0..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/LoadSummary.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.metrics;
-
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.commons.lang3.time.StopWatch;
-
-public final class LoadSummary {
-
- private final LongAdder vertexLoaded;
- private final StopWatch totalTimer;
- private final LoadMetrics metrics;
-
- public LoadSummary() {
- this.vertexLoaded = new LongAdder();
- this.totalTimer = new StopWatch();
- this.metrics = new LoadMetrics();
- }
-
- public LoadMetrics metrics() {
- return this.metrics;
- }
-
- public long vertexLoaded() {
- return this.vertexLoaded.longValue();
- }
-
- public void plusLoaded(int count) {
- this.vertexLoaded.add(count);
- }
-
- public long totalTime() {
- return this.totalTimer.getTime();
- }
-
- public void startTimer() {
- if (!this.totalTimer.isStarted()) {
- this.totalTimer.start();
- }
- }
-
- public void stopTimer() {
- if (!this.totalTimer.isStopped()) {
- this.totalTimer.stop();
- }
- }
-
- public long loadRate() {
- long totalTime = this.totalTime();
- if (totalTime == 0) {
- return -1;
- }
- return this.vertexLoaded() * 1000 / totalTime;
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/Printer.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/Printer.java
deleted file mode 100644
index d67ea7c3..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/metrics/Printer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.metrics;
-
-import org.slf4j.Logger;
-
-import org.apache.hugegraph.util.Log;
-import org.apache.hugegraph.util.TimeUtil;
-
-public final class Printer {
-
- private static final Logger LOG = Log.logger(Printer.class);
-
- public static void printSummary(LoadSummary summary) {
- printCountReport(LoadReport.collect(summary));
- printMeterReport(summary);
- }
-
- private static void printCountReport(LoadReport report) {
- log("count metrics");
- log("vertex insert success", report.vertexInsertSuccess());
- log("vertex insert failure", report.vertexInsertFailure());
- }
-
- private static void printMeterReport(LoadSummary summary) {
- log("meter metrics");
- log("total time", TimeUtil.readableTime(summary.totalTime()));
- log("vertex load rate(vertices/s)", summary.loadRate());
- }
-
- private static void log(String message) {
- LOG.info(message);
- }
-
- private static void log(String key, long value) {
- LOG.info(String.format(" %-30s: %-20d", key, value));
- }
-
- private static void log(String key, String value) {
- LOG.info(String.format(" %-30s: %-20s", key, value));
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/BatchInsertTask.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/BatchInsertTask.java
deleted file mode 100644
index ad96378f..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/BatchInsertTask.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.task;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hugegraph.rest.ClientException;
-import org.slf4j.Logger;
-
-import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.config.Config;
-import com.baidu.hugegraph.computer.core.output.hg.metrics.LoadSummary;
-import org.apache.hugegraph.driver.HugeClient;
-import org.apache.hugegraph.exception.ServerException;
-import org.apache.hugegraph.structure.graph.Vertex;
-import org.apache.hugegraph.util.Log;
-
-public class BatchInsertTask extends InsertTask {
-
- private static final Logger LOG = Log.logger(TaskManager.class);
-
- public BatchInsertTask(Config config, HugeClient client,
- List<Vertex> batch, LoadSummary loadSummary) {
- super(config, client, batch, loadSummary);
- }
-
- @Override
- public void run() {
- int retryCount = 0;
- int retryTimes = this.config.get(ComputerOptions.OUTPUT_RETRY_TIMES);
- do {
- try {
- this.insertBatch(this.batch);
- break;
- } catch (ClientException e) {
- LOG.debug("client exception: {}", e.getMessage());
- Throwable cause = e.getCause();
- if (cause != null && cause.getMessage() != null) {
- if (StringUtils.containsAny(cause.getMessage(),
- UNACCEPTABLE_MESSAGES)) {
- throw e;
- }
- }
- retryCount = this.waitThenRetry(retryCount, e);
- } catch (ServerException e) {
- String message = e.getMessage();
- LOG.error("server exception: {}", message);
- if (UNACCEPTABLE_EXCEPTIONS.contains(e.exception())) {
- throw e;
- }
- if (StringUtils.containsAny(message, UNACCEPTABLE_MESSAGES)) {
- throw e;
- }
- retryCount = this.waitThenRetry(retryCount, e);
- }
- } while (retryCount > 0 && retryCount <= retryTimes);
-
- int count = this.batch.size();
- // This metrics just for current element mapping
- this.plusLoadSuccess(count);
- }
-
- private int waitThenRetry(int retryCount, RuntimeException e) {
- int retryTimes = this.config.get(ComputerOptions.OUTPUT_RETRY_TIMES);
- if (retryTimes <= 0) {
- return retryCount;
- }
-
- if (++retryCount > retryTimes) {
- LOG.error("Batch insert has been retried more than {} times",
- retryTimes);
- throw e;
- }
-
- long interval = (1L << retryCount) *
- this.config.get(ComputerOptions.OUTPUT_RETRY_INTERVAL);
- LOG.debug("Batch insert will sleep {} seconds then do the {}th retry",
- interval, retryCount);
- try {
- Thread.sleep(interval * 1000L);
- } catch (InterruptedException ignored) {
- // That's fine, just continue.
- }
- return retryCount;
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/InsertTask.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/InsertTask.java
deleted file mode 100644
index f91710fd..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/InsertTask.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.task;
-
-import java.util.List;
-import java.util.Set;
-
-import com.baidu.hugegraph.computer.core.config.Config;
-import com.baidu.hugegraph.computer.core.output.hg.metrics.LoadMetrics;
-import com.baidu.hugegraph.computer.core.output.hg.metrics.LoadSummary;
-import org.apache.hugegraph.driver.HugeClient;
-import org.apache.hugegraph.structure.graph.Vertex;
-import com.google.common.collect.ImmutableSet;
-
-public abstract class InsertTask implements Runnable {
-
- public static final Set<String> UNACCEPTABLE_EXCEPTIONS = ImmutableSet.of(
- "class java.lang.IllegalArgumentException"
- );
-
- public static final String[] UNACCEPTABLE_MESSAGES = {
- // org.apache.http.conn.HttpHostConnectException
- "Connection refused",
- "The server is being shutting down",
- "not allowed to insert, because already exist a vertex " +
- "with same id and different label"
- };
-
- protected Config config;
- private HugeClient client;
- protected final List<Vertex> batch;
- private LoadSummary summary;
-
- public InsertTask(Config config, HugeClient client,
- List<Vertex> batch, LoadSummary loadSummary) {
- this.config = config;
- this.client = client;
- this.batch = batch;
- this.summary = loadSummary;
- }
-
- public LoadSummary summary() {
- return this.summary;
- }
-
- public LoadMetrics metrics() {
- return this.summary().metrics();
- }
-
- protected void plusLoadSuccess(int count) {
- LoadMetrics metrics = this.summary().metrics();
- metrics.plusInsertSuccess(count);
- this.summary().plusLoaded(count);
- }
-
- protected void increaseLoadSuccess() {
- this.plusLoadSuccess(1);
- }
-
- protected void insertBatch(List<Vertex> vertices) {
- this.client.graph().addVertices(vertices);
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/SingleInsertTask.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/SingleInsertTask.java
deleted file mode 100644
index 4cb893c1..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/SingleInsertTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.task;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-
-import com.baidu.hugegraph.computer.core.config.Config;
-import com.baidu.hugegraph.computer.core.output.hg.metrics.LoadSummary;
-import org.apache.hugegraph.driver.HugeClient;
-import org.apache.hugegraph.structure.graph.Vertex;
-import org.apache.hugegraph.util.Log;
-import com.google.common.collect.ImmutableList;
-
-public class SingleInsertTask extends InsertTask {
-
- private static final Logger LOG = Log.logger(TaskManager.class);
-
- public SingleInsertTask(Config config, HugeClient client,
- List<Vertex> batch, LoadSummary loadSummary) {
- super(config, client, batch, loadSummary);
- }
-
- @Override
- public void run() {
- for (Vertex vertex : this.batch) {
- try {
- this.insertSingle(vertex);
- this.increaseLoadSuccess();
- } catch (Exception e) {
- this.metrics().increaseInsertFailure();
- this.handleInsertFailure(e);
- }
- }
- }
-
- private void handleInsertFailure(Exception e) {
- LOG.error("Single insert error", e);
- }
-
- private void insertSingle(Vertex vertex) {
- this.insertBatch(ImmutableList.of(vertex));
- }
-}
diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/TaskManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/TaskManager.java
deleted file mode 100644
index c715cee9..00000000
--- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/hg/task/TaskManager.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.core.output.hg.task;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import com.baidu.hugegraph.computer.core.config.ComputerOptions;
-import com.baidu.hugegraph.computer.core.config.Config;
-import com.baidu.hugegraph.computer.core.output.hg.exceptions.WriteBackException;
-
-
-import com.baidu.hugegraph.computer.core.output.hg.metrics.LoadSummary;
-import com.baidu.hugegraph.computer.core.output.hg.metrics.Printer;
-import org.apache.hugegraph.driver.HugeClient;
-import org.apache.hugegraph.driver.HugeClientBuilder;
-import org.apache.hugegraph.structure.graph.Vertex;
-import org.apache.hugegraph.util.ExecutorUtil;
-import org.apache.hugegraph.util.Log;
-
-public final class TaskManager {
-
- private static final Logger LOG = Log.logger(TaskManager.class);
-
- public static final String BATCH_WORKER = "batch-worker-%d";
- public static final String SINGLE_WORKER = "single-worker-%d";
-
- private HugeClient client;
- private Config config;
-
- private final Semaphore batchSemaphore;
- private final Semaphore singleSemaphore;
- private final ExecutorService batchService;
- private final ExecutorService singleService;
-
- private LoadSummary loadSummary;
-
- public TaskManager(Config config) {
- this.config = config;
- String url = config.get(ComputerOptions.HUGEGRAPH_URL);
- String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
- this.client = new HugeClientBuilder(url, graph).build();
- // Try to make all batch threads running and don't wait for producer
- this.batchSemaphore = new Semaphore(this.batchSemaphoreNum());
- /*
- * Let batch threads go forward as far as possible and don't wait for
- * single thread pool
- */
- this.singleSemaphore = new Semaphore(this.singleSemaphoreNum());
- /*
- * In principle, unbounded synchronization queue(which may lead to OOM)
- * should not be used, but there the task manager uses semaphores to
- * limit the number of tasks added. When there are no idle threads in
- * the thread pool, the producer will be blocked, so OOM will not occur.
- */
- this.batchService = ExecutorUtil.newFixedThreadPool(
- config.get(ComputerOptions.OUTPUT_BATCH_THREADS),
- BATCH_WORKER);
- this.singleService = ExecutorUtil.newFixedThreadPool(
- config.get(ComputerOptions.OUTPUT_SINGLE_THREADS),
- SINGLE_WORKER);
-
- this.loadSummary = new LoadSummary();
- this.loadSummary.startTimer();
- }
-
- public HugeClient client() {
- return this.client;
- }
-
- private int batchSemaphoreNum() {
- return 1 + this.config.get(ComputerOptions.OUTPUT_BATCH_THREADS);
- }
-
- private int singleSemaphoreNum() {
- return 2 * this.config.get(ComputerOptions.OUTPUT_SINGLE_THREADS);
- }
-
- public void waitFinished() {
- LOG.info("Waiting for the insert tasks finished");
- try {
- // Wait batch mode task stopped
- this.batchSemaphore.acquire(this.batchSemaphoreNum());
- LOG.info("The batch-mode tasks stopped");
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting batch-mode tasks");
- } finally {
- this.batchSemaphore.release(this.batchSemaphoreNum());
- }
-
- try {
- // Wait single mode task stopped
- this.singleSemaphore.acquire(this.singleSemaphoreNum());
- LOG.info("The single-mode tasks stopped");
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting single-mode tasks");
- } finally {
- this.singleSemaphore.release(this.singleSemaphoreNum());
- }
- }
-
- public void shutdown() {
- long timeout = this.config.get(
- ComputerOptions.OUTPUT_THREAD_POOL_SHUTDOWN_TIMEOUT);
- try {
- this.batchService.shutdown();
- this.batchService.awaitTermination(timeout, TimeUnit.SECONDS);
- LOG.info("The batch-mode tasks service executor shutdown");
- } catch (InterruptedException e) {
- LOG.error("The batch-mode tasks are interrupted");
- } finally {
- if (!this.batchService.isTerminated()) {
- LOG.error("The unfinished batch-mode tasks will be cancelled");
- }
- this.batchService.shutdownNow();
- }
-
- try {
- this.singleService.shutdown();
- this.singleService.awaitTermination(timeout, TimeUnit.SECONDS);
- LOG.info("The single-mode tasks service executor shutdown");
- } catch (InterruptedException e) {
- LOG.error("The single-mode tasks are interrupted");
- } finally {
- if (!this.singleService.isTerminated()) {
- LOG.error("The unfinished single-mode tasks will be cancelled");
- }
- this.singleService.shutdownNow();
- }
- this.loadSummary.stopTimer();
- Printer.printSummary(this.loadSummary);
-
- this.client.close();
- }
-
- public void submitBatch(List<Vertex> batch) {
- try {
- this.batchSemaphore.acquire();
- } catch (InterruptedException e) {
- throw new WriteBackException(
- "Interrupted while waiting to submit batch", e);
- }
-
- InsertTask task = new BatchInsertTask(this.config, this.client,
- batch, this.loadSummary);
- CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
- LOG.warn("Batch insert error, try single insert", e);
- this.submitInSingle(batch);
- return null;
- }).whenComplete((r, e) -> this.batchSemaphore.release());
- }
-
- private void submitInSingle(List<Vertex> batch) {
- try {
- this.singleSemaphore.acquire();
- } catch (InterruptedException e) {
- throw new WriteBackException(
- "Interrupted while waiting to submit single", e);
- }
-
- InsertTask task = new SingleInsertTask(this.config, this.client,
- batch, this.loadSummary);
- CompletableFuture.runAsync(task, this.singleService)
- .whenComplete((r, e) -> {
- this.singleSemaphore.release();
- });
- }
-}
diff --git a/computer-dist/src/assembly/travis/build-images.sh b/computer-dist/src/assembly/travis/build-images.sh
index e57ca429..1dededfe 100755
--- a/computer-dist/src/assembly/travis/build-images.sh
+++ b/computer-dist/src/assembly/travis/build-images.sh
@@ -36,7 +36,7 @@ PROJECT_VERSION=$(mvn -f "${PROJECT_POM_PATH}" -q -N \
docker build -t $1 $CONTEXT_PATH -f $PROJECT_PATH/computer-dist/Dockerfile
echo "FROM $1
-LABEL maintainer='HugeGraph Docker Maintainers <hu...@googlegroups.com>'
+LABEL maintainer='HugeGraph Docker Maintainers <de...@hugegraph.apache.org>'
COPY target/computer-algorithm-*.jar $JAR_FILE_PATH
ENV JAR_FILE_PATH=$JAR_FILE_PATH" | \
docker build -t $2 -f - $PROJECT_PATH/computer-algorithm
diff --git a/computer-driver/pom.xml b/computer-driver/pom.xml
index 6abecae4..abfa30cf 100644
--- a/computer-driver/pom.xml
+++ b/computer-driver/pom.xml
@@ -30,8 +30,9 @@
<dependencies>
<dependency>
- <groupId>org.apache.hugegraph</groupId>
- <artifactId>hugegraph-client</artifactId>
+ <groupId>com.baidu.hugegraph</groupId>
+ <artifactId>computer-api</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
</project>
diff --git a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/config/ComputerOptions.java b/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/config/ComputerOptions.java
deleted file mode 100644
index 9e367845..00000000
--- a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/config/ComputerOptions.java
+++ /dev/null
@@ -1,721 +0,0 @@
-/*
- * Copyright 2017 HugeGraph Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package com.baidu.hugegraph.computer.driver.config;
-
-import static org.apache.hugegraph.config.OptionChecker.allowValues;
-import static org.apache.hugegraph.config.OptionChecker.disallowEmpty;
-import static org.apache.hugegraph.config.OptionChecker.nonNegativeInt;
-import static org.apache.hugegraph.config.OptionChecker.positiveInt;
-import static org.apache.hugegraph.config.OptionChecker.rangeInt;
-
-import java.util.Set;
-
-import org.apache.hugegraph.config.OptionHolder;
-import com.google.common.collect.ImmutableSet;
-
-public class ComputerOptions extends OptionHolder {
-
- private ComputerOptions() {
- super();
- }
-
- private static volatile ComputerOptions INSTANCE;
-
- public static synchronized ComputerOptions instance() {
- if (INSTANCE == null) {
- INSTANCE = new ComputerOptions();
- // Should initialize all static members first, then register.
- INSTANCE.registerOptions();
- }
- return INSTANCE;
- }
-
- public static final DriverConfigOption<String>
- ALGORITHM_RESULT_CLASS =
- new DriverConfigOption<>(
- "algorithm.result_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- ALGORITHM_MESSAGE_CLASS =
- new DriverConfigOption<>(
- "algorithm.message_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String> INPUT_SOURCE_TYPE =
- new DriverConfigOption<>(
- "input.source_type",
- allowValues("hugegraph-server", "hugegraph-loader"),
- String.class
- );
-
- public static final DriverConfigOption<Long> INPUT_SPLITS_SIZE =
- new DriverConfigOption<>(
- "input.split_size",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Integer> INPUT_MAX_SPLITS =
- new DriverConfigOption<>(
- "input.split_max_splits",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- INPUT_SPLIT_PAGE_SIZE =
- new DriverConfigOption<>(
- "input.split_page_size",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String> INPUT_FILTER_CLASS =
- new DriverConfigOption<>(
- "input.filter_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- INPUT_EDGE_DIRECTION = new DriverConfigOption<>(
- "input.edge_direction",
- allowValues("OUT", "IN", "BOTH"),
- String.class
- );
-
- public static final DriverConfigOption<String> INPUT_EDGE_FREQ =
- new DriverConfigOption<>(
- "input.edge_freq",
- allowValues("SINGLE", "SINGLE_PER_LABEL", "MULTIPLE"),
- String.class
- );
-
- public static final DriverConfigOption<Integer>
- INPUT_MAX_EDGES_IN_ONE_VERTEX = new DriverConfigOption<>(
- "input.max_edges_in_one_vertex",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> SORT_THREAD_NUMS =
- new DriverConfigOption<>(
- "sort.thread_nums",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String> OUTPUT_RESULT_NAME =
- new DriverConfigOption<>(
- "output.result_name",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Boolean>
- OUTPUT_WITH_ADJACENT_EDGES =
- new DriverConfigOption<>(
- "output.with_adjacent_edges",
- allowValues(true, false),
- Boolean.class
- );
-
- public static final DriverConfigOption<Boolean>
- OUTPUT_WITH_VERTEX_PROPERTIES = new DriverConfigOption<>(
- "output.with_vertex_properties",
- allowValues(true, false),
- Boolean.class
- );
-
- public static final DriverConfigOption<Boolean>
- OUTPUT_WITH_EDGE_PROPERTIES = new DriverConfigOption<>(
- "output.with_edge_properties",
- allowValues(true, false),
- Boolean.class
- );
-
- public static final DriverConfigOption<Integer> OUTPUT_BATCH_SIZE =
- new DriverConfigOption<>(
- "output.batch_size",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> OUTPUT_BATCH_THREADS =
- new DriverConfigOption<>(
- "output.batch_threads",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> OUTPUT_SINGLE_THREADS =
- new DriverConfigOption<>(
- "output.single_threads",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- OUTPUT_THREAD_POOL_SHUTDOWN_TIMEOUT = new DriverConfigOption<>(
- "output.thread_pool_shutdown_timeout",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> OUTPUT_RETRY_TIMES =
- new DriverConfigOption<>(
- "output.retry_times",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> OUTPUT_RETRY_INTERVAL =
- new DriverConfigOption<>(
- "output.retry_interval",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- VERTEX_AVERAGE_DEGREE =
- new DriverConfigOption<>(
- "computer.vertex_average_degree",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- ALLOCATOR_MAX_VERTICES_PER_THREAD = new DriverConfigOption<>(
- "allocator.max_vertices_per_thread",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String> JOB_ID =
- new DriverConfigOption<>(
- "job.id",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Integer> JOB_WORKERS_COUNT =
- new DriverConfigOption<>(
- "job.workers_count",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- JOB_PARTITIONS_COUNT = new DriverConfigOption<>(
- "job.partitions_count",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> BSP_MAX_SUPER_STEP =
- new DriverConfigOption<>(
- "bsp.max_super_step",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String> BSP_ETCD_ENDPOINTS =
- new DriverConfigOption<>(
- "bsp.etcd_endpoints",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Long> BSP_REGISTER_TIMEOUT =
- new DriverConfigOption<>(
- "bsp.register_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- BSP_WAIT_WORKERS_TIMEOUT =
- new DriverConfigOption<>(
- "bsp.wait_workers_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- BSP_WAIT_MASTER_TIMEOUT = new DriverConfigOption<>(
- "bsp.wait_master_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long> BSP_LOG_INTERVAL =
- new DriverConfigOption<>(
- "bsp.log_interval",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<String> WORKER_PARTITIONER =
- new DriverConfigOption<>(
- "worker.partitioner",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- WORKER_COMPUTATION_CLASS =
- new DriverConfigOption<>(
- "worker.computation_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- ALGORITHM_PARAMS_CLASS = new DriverConfigOption<>(
- "algorithm.params_class",
- "The class that set algorithms's parameters before " +
- "algorithm been run.",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- WORKER_COMBINER_CLASS = new DriverConfigOption<>(
- "worker.combiner_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- WORKER_VERTEX_PROPERTIES_COMBINER_CLASS =
- new DriverConfigOption<>(
- "worker.vertex_properties_combiner_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- WORKER_EDGE_PROPERTIES_COMBINER_CLASS =
- new DriverConfigOption<>(
- "worker.edge_properties_combiner_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Long>
- WORKER_RECEIVED_BUFFERS_BYTES_LIMIT =
- new DriverConfigOption<>(
- "worker.received_buffers_bytes_limit",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- WORKER_WAIT_SORT_TIMEOUT =
- new DriverConfigOption<>(
- "worker.wait_sort_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- WORKER_WAIT_FINISH_MESSAGES_TIMEOUT =
- new DriverConfigOption<>(
- "worker.wait_finish_messages_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<String> WORKER_DATA_DIRS =
- new DriverConfigOption<>(
- "worker.data_dirs",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Integer>
- WORKER_WRITE_BUFFER_THRESHOLD = new DriverConfigOption<>(
- "worker.write_buffer_threshold",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- WORKER_WRITE_BUFFER_INIT_CAPACITY =
- new DriverConfigOption<>(
- "worker.write_buffer_capacity",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String>
- MASTER_COMPUTATION_CLASS =
- new DriverConfigOption<>(
- "master.computation_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String> HUGEGRAPH_URL =
- new DriverConfigOption<>(
- "hugegraph.url",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- HUGEGRAPH_GRAPH_NAME = new DriverConfigOption<>(
- "hugegraph.name",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String>
- TRANSPORT_SERVER_HOST = new DriverConfigOption<>(
- "transport.server_host",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_SERVER_PORT =
- new DriverConfigOption<>(
- "transport.server_port",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_SERVER_THREADS = new DriverConfigOption<>(
- "transport.server_threads",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_CLIENT_THREADS = new DriverConfigOption<>(
- "transport.client_threads",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String>
- TRANSPORT_PROVIDER_CLASS = new DriverConfigOption<>(
- "transport.provider_class",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String> TRANSPORT_IO_MODE =
- new DriverConfigOption<>(
- "transport.io_mode",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Boolean> TRANSPORT_EPOLL_LT =
- new DriverConfigOption<>(
- "transport.transport_epoll_lt",
- allowValues(true, false),
- Boolean.class
- );
-
- public static final DriverConfigOption<Boolean>
- TRANSPORT_TCP_KEEP_ALIVE = new DriverConfigOption<>(
- "transport.transport_tcp_keep_alive",
- allowValues(true, false),
- Boolean.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_MAX_SYN_BACKLOG = new DriverConfigOption<>(
- "transport.max_syn_backlog",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_RECEIVE_BUFFER_SIZE = new DriverConfigOption<>(
- "transport.receive_buffer_size",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_SEND_BUFFER_SIZE =
- new DriverConfigOption<>(
- "transport.send_buffer_size",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_CLIENT_CONNECT_TIMEOUT = new DriverConfigOption<>(
- "transport.client_connect_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_CLOSE_TIMEOUT = new DriverConfigOption<>(
- "transport.close_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_SYNC_REQUEST_TIMEOUT = new DriverConfigOption<>(
- "transport.sync_request_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_FINISH_SESSION_TIMEOUT = new DriverConfigOption<>(
- "transport.finish_session_timeout",
- nonNegativeInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_WRITE_SOCKET_TIMEOUT = new DriverConfigOption<>(
- "transport.write_socket_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_NETWORK_RETRIES = new DriverConfigOption<>(
- "transport.network_retries",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_WRITE_BUFFER_HIGH_MARK = new DriverConfigOption<>(
- "transport.write_buffer_high_mark",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_WRITE_BUFFER_LOW_MARK = new DriverConfigOption<>(
- "transport.write_buffer_low_mark",
- nonNegativeInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_MAX_PENDING_REQUESTS = new DriverConfigOption<>(
- "transport.max_pending_requests",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_MIN_PENDING_REQUESTS = new DriverConfigOption<>(
- "transport.min_pending_requests",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_MIN_ACK_INTERVAL =
- new DriverConfigOption<>(
- "transport.min_ack_interval",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_SERVER_IDLE_TIMEOUT = new DriverConfigOption<>(
- "transport.server_idle_timeout",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long>
- TRANSPORT_HEARTBEAT_INTERVAL = new DriverConfigOption<>(
- "transport.heartbeat_interval",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Integer>
- TRANSPORT_MAX_TIMEOUT_HEARTBEAT_COUNT =
- new DriverConfigOption<>(
- "transport.max_timeout_heartbeat_count",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<Long> HGKV_MAX_FILE_SIZE =
- new DriverConfigOption<>(
- "hgkv.max_file_size",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Long> HGKV_DATABLOCK_SIZE =
- new DriverConfigOption<>(
- "hgkv.max_data_block_size",
- positiveInt(),
- Long.class
- );
-
- public static final DriverConfigOption<Integer>
- HGKV_MERGE_FILES_NUM = new DriverConfigOption<>(
- "hgkv.max_merge_files",
- positiveInt(),
- Integer.class
- );
-
- public static final DriverConfigOption<String> HGKV_TEMP_DIR =
- new DriverConfigOption<>(
- "hgkv.temp_file_dir",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<String> RPC_SERVER_HOST =
- new DriverConfigOption<>(
- "rpc.server_host",
- null,
- String.class
- );
-
- public static final DriverConfigOption<Integer> RPC_SERVER_PORT =
- new DriverConfigOption<>(
- "rpc.server_port",
- rangeInt(0, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<String> RPC_REMOTE_URL =
- new DriverConfigOption<>(
- "rpc.remote_url",
- null,
- String.class
- );
-
- public static final DriverConfigOption<Boolean> RPC_ADAPTIVE_PORT =
- new DriverConfigOption<>(
- "rpc.server_adaptive_port",
- disallowEmpty(),
- Boolean.class
- );
-
- public static final DriverConfigOption<Integer> RPC_SERVER_TIMEOUT =
- new DriverConfigOption<>(
- "rpc.server_timeout",
- rangeInt(1, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- RPC_CLIENT_CONNECT_TIMEOUT = new DriverConfigOption<>(
- "rpc.client_connect_timeout",
- rangeInt(1, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- RPC_CLIENT_RECONNECT_PERIOD = new DriverConfigOption<>(
- "rpc.client_reconnect_period",
- rangeInt(1, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer>
- RPC_CLIENT_READ_TIMEOUT =
- new DriverConfigOption<>(
- "rpc.client_read_timeout",
- rangeInt(1, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<Integer> RPC_CLIENT_RETRIES =
- new DriverConfigOption<>(
- "rpc.client_retries",
- rangeInt(0, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<String>
- RPC_CLIENT_LOAD_BALANCER =
- new DriverConfigOption<>(
- "rpc.client_load_balancer",
- allowValues("random", "localPref", "roundRobin",
- "consistentHash", "weightRoundRobin"),
- String.class
- );
-
- public static final DriverConfigOption<String> RPC_PROTOCOL =
- new DriverConfigOption<>(
- "rpc.protocol",
- allowValues("bolt", "rest", "dubbo", "h2c", "http"),
- String.class
- );
-
- public static final DriverConfigOption<Integer> RPC_CONFIG_ORDER =
- new DriverConfigOption<>(
- "rpc.config_order",
- rangeInt(1, Integer.MAX_VALUE),
- Integer.class
- );
-
- public static final DriverConfigOption<String> RPC_LOGGER_IMPL =
- new DriverConfigOption<>(
- "rpc.logger_impl",
- disallowEmpty(),
- String.class
- );
-
- public static final DriverConfigOption<Long>
- VALUE_FILE_MAX_SEGMENT_SIZE =
- new DriverConfigOption<>(
- "valuefile.max_segment_size",
- positiveInt(),
- Long.class
- );
-
- public static final Set<String> REQUIRED_INIT_OPTIONS = ImmutableSet.of(
- );
-
- public static final Set<String> K8S_PROHIBIT_USER_SETTINGS =
- ImmutableSet.of(
- HUGEGRAPH_URL.name(),
- BSP_ETCD_ENDPOINTS.name(),
- RPC_SERVER_HOST.name(),
- TRANSPORT_SERVER_HOST.name(),
- JOB_ID.name(),
- JOB_WORKERS_COUNT.name(),
- RPC_REMOTE_URL.name()
- );
-
- public static final Set<String> K8S_REQUIRED_USER_OPTIONS = ImmutableSet.of(
- ALGORITHM_PARAMS_CLASS.name()
- );
-}
diff --git a/computer-k8s-operator/src/main/java/com/baidu/hugegraph/computer/k8s/operator/controller/ComputerJobDeployer.java b/computer-k8s-operator/src/main/java/com/baidu/hugegraph/computer/k8s/operator/controller/ComputerJobDeployer.java
index 76a35b56..47bf92c1 100644
--- a/computer-k8s-operator/src/main/java/com/baidu/hugegraph/computer/k8s/operator/controller/ComputerJobDeployer.java
+++ b/computer-k8s-operator/src/main/java/com/baidu/hugegraph/computer/k8s/operator/controller/ComputerJobDeployer.java
@@ -32,11 +32,12 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.k8s.Constants;
import com.baidu.hugegraph.computer.k8s.crd.model.ComputerJobSpec;
import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
@@ -185,7 +186,7 @@ public class ComputerJobDeployer {
String ip = String.format("${%s}", Constants.ENV_POD_IP);
config.put(ComputerOptions.TRANSPORT_SERVER_HOST.name(), ip);
- config.put(ComputerOptions.RPC_SERVER_HOST.name(), ip);
+ config.put(RpcOptions.RPC_SERVER_HOST.name(), ip);
config.putIfAbsent(ComputerOptions.BSP_ETCD_ENDPOINTS.name(),
this.internalEtcdUrl);
@@ -203,11 +204,10 @@ public class ComputerJobDeployer {
.withProtocol(PROTOCOL)
.build();
- String rpcPort = config.get(
- ComputerOptions.RPC_SERVER_PORT.name());
+ String rpcPort = config.get(RpcOptions.RPC_SERVER_PORT.name());
if (StringUtils.isBlank(rpcPort) || RANDOM_PORT.equals(rpcPort)) {
rpcPort = String.valueOf(DEFAULT_RPC_PORT);
- config.put(ComputerOptions.RPC_SERVER_PORT.name(), rpcPort);
+ config.put(RpcOptions.RPC_SERVER_PORT.name(), rpcPort);
}
ContainerPort rpcContainerPort = new ContainerPortBuilder()
.withName(RPC_PORT_NAME)
diff --git a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/config/KubeDriverOptions.java b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/config/KubeDriverOptions.java
index ef59cf39..a5c2d588 100644
--- a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/config/KubeDriverOptions.java
+++ b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/config/KubeDriverOptions.java
@@ -23,12 +23,12 @@ import static org.apache.hugegraph.config.OptionChecker.allowValues;
import static org.apache.hugegraph.config.OptionChecker.disallowEmpty;
import org.apache.commons.io.FileUtils;
-
-import com.baidu.hugegraph.computer.k8s.Constants;
import org.apache.hugegraph.config.ConfigListOption;
import org.apache.hugegraph.config.ConfigOption;
import org.apache.hugegraph.config.OptionHolder;
+import com.baidu.hugegraph.computer.k8s.Constants;
+
public class KubeDriverOptions extends OptionHolder {
private KubeDriverOptions() {
diff --git a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java
index 1b392319..9ce2fe6f 100644
--- a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java
+++ b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java
@@ -44,8 +44,15 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hugegraph.config.ConfigListOption;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.config.RpcOptions;
+import org.apache.hugegraph.config.TypedOption;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.driver.ComputerDriver;
import com.baidu.hugegraph.computer.driver.ComputerDriverException;
import com.baidu.hugegraph.computer.driver.DefaultJobState;
@@ -53,7 +60,6 @@ import com.baidu.hugegraph.computer.driver.JobObserver;
import com.baidu.hugegraph.computer.driver.JobState;
import com.baidu.hugegraph.computer.driver.JobStatus;
import com.baidu.hugegraph.computer.driver.SuperstepStat;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
import com.baidu.hugegraph.computer.driver.config.DriverConfigOption;
import com.baidu.hugegraph.computer.k8s.Constants;
import com.baidu.hugegraph.computer.k8s.config.KubeDriverOptions;
@@ -63,11 +69,7 @@ import com.baidu.hugegraph.computer.k8s.crd.model.ComputerJobStatus;
import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJobList;
import com.baidu.hugegraph.computer.k8s.util.KubeUtil;
-import org.apache.hugegraph.config.ConfigListOption;
-import org.apache.hugegraph.config.HugeConfig;
-import org.apache.hugegraph.config.TypedOption;
-import org.apache.hugegraph.util.E;
-import org.apache.hugegraph.util.Log;
+import com.google.common.collect.ImmutableSet;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
@@ -265,10 +267,9 @@ public class KubernetesDriver implements ComputerDriver {
private void checkComputerConf(Map<String, String> computerConf,
ComputerJobSpec spec) {
- Set<String> requiredOptions = ComputerOptions.K8S_REQUIRED_USER_OPTIONS;
@SuppressWarnings("unchecked")
Collection<String> unSetOptions = CollectionUtils.removeAll(
- requiredOptions,
+ COMPUTER_REQUIRED_USER_OPTIONS,
computerConf.keySet());
E.checkArgument(unSetOptions.isEmpty(),
"The %s options can't be null", unSetOptions);
@@ -468,7 +469,7 @@ public class KubernetesDriver implements ComputerDriver {
params.forEach((k, v) -> {
if (StringUtils.isNotBlank(k) && StringUtils.isNotBlank(v)) {
if (!k.startsWith(Constants.K8S_SPEC_PREFIX) &&
- !ComputerOptions.K8S_PROHIBIT_USER_SETTINGS.contains(k)) {
+ !COMPUTER_PROHIBIT_USER_SETTINGS.contains(k)) {
DriverConfigOption<?> typedOption = (DriverConfigOption<?>)
allOptions.get(k);
if (typedOption != null) {
@@ -493,7 +494,7 @@ public class KubernetesDriver implements ComputerDriver {
if (value != null) {
defaultConf.put(key, String.valueOf(value));
} else {
- boolean required = ComputerOptions.REQUIRED_INIT_OPTIONS
+ boolean required = ComputerOptions.REQUIRED_OPTIONS
.contains(key);
E.checkArgument(!required, "The %s option can't be null", key);
}
@@ -586,4 +587,20 @@ public class KubernetesDriver implements ComputerDriver {
Map<String, Object> specMap = HugeGraphComputerJob.specToMap(spec);
return Collections.unmodifiableMap(specMap);
}
+
+ public static final Set<String> COMPUTER_PROHIBIT_USER_SETTINGS =
+ ImmutableSet.of(
+ ComputerOptions.HUGEGRAPH_URL.name(),
+ ComputerOptions.BSP_ETCD_ENDPOINTS.name(),
+ ComputerOptions.TRANSPORT_SERVER_HOST.name(),
+ ComputerOptions.JOB_ID.name(),
+ ComputerOptions.JOB_WORKERS_COUNT.name(),
+ RpcOptions.RPC_SERVER_HOST.name(),
+ RpcOptions.RPC_SERVER_PORT.name(),
+ RpcOptions.RPC_REMOTE_URL.name()
+ );
+
+ public static final Set<String> COMPUTER_REQUIRED_USER_OPTIONS = ImmutableSet.of(
+ ComputerOptions.ALGORITHM_PARAMS_CLASS.name()
+ );
}
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/ComputerOptionsTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/ComputerOptionsTest.java
index 940b9b1d..17f6dca3 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/ComputerOptionsTest.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/ComputerOptionsTest.java
@@ -26,16 +26,16 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.configuration2.MapConfiguration;
+import org.apache.hugegraph.config.ConfigException;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.config.TypedOption;
+import org.apache.hugegraph.testutil.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
import com.baidu.hugegraph.computer.driver.config.DriverConfigOption;
-import org.apache.hugegraph.config.ConfigException;
-import org.apache.hugegraph.config.HugeConfig;
-import org.apache.hugegraph.config.TypedOption;
-import org.apache.hugegraph.testutil.Assert;
public class ComputerOptionsTest {
private static Map<String, String> options;
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/DriverTestSuite.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/DriverTestSuite.java
index c65dfbdd..9dffede0 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/DriverTestSuite.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/driver/DriverTestSuite.java
@@ -19,23 +19,13 @@
package com.baidu.hugegraph.computer.driver;
-import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
-import org.apache.hugegraph.config.OptionSpace;
-
@RunWith(Suite.class)
@Suite.SuiteClasses({
DriverTest.class,
ComputerOptionsTest.class,
})
public class DriverTestSuite {
-
- @BeforeClass
- public static void setup() {
- OptionSpace.register("computer-driver",
- "com.baidu.hugegraph.computer.driver.config" +
- ".ComputerOptions");
- }
}
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/AbstractK8sTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/AbstractK8sTest.java
index 00cddab0..33c8224a 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/AbstractK8sTest.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/AbstractK8sTest.java
@@ -30,11 +30,16 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration2.MapConfiguration;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.config.OptionSpace;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.apache.hugegraph.util.ExecutorUtil;
import org.junit.After;
import org.junit.Before;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeDriverOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeSpecOptions;
import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
@@ -44,11 +49,6 @@ import com.baidu.hugegraph.computer.k8s.operator.OperatorEntrypoint;
import com.baidu.hugegraph.computer.k8s.operator.config.OperatorOptions;
import com.baidu.hugegraph.computer.k8s.util.KubeUtil;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
-import org.apache.hugegraph.config.HugeConfig;
-import org.apache.hugegraph.config.OptionSpace;
-import org.apache.hugegraph.testutil.Assert;
-import org.apache.hugegraph.testutil.Whitebox;
-import org.apache.hugegraph.util.ExecutorUtil;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java
index ddd03e21..cd49051b 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java
@@ -33,18 +33,21 @@ import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hugegraph.config.HugeConfig;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.testutil.Whitebox;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.driver.ComputerDriverException;
import com.baidu.hugegraph.computer.driver.DefaultJobState;
import com.baidu.hugegraph.computer.driver.JobObserver;
import com.baidu.hugegraph.computer.driver.JobState;
import com.baidu.hugegraph.computer.driver.JobStatus;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeDriverOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeSpecOptions;
import com.baidu.hugegraph.computer.k8s.crd.model.ComputerJobSpec;
@@ -52,9 +55,6 @@ import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
import com.baidu.hugegraph.computer.k8s.driver.KubernetesDriver;
import com.baidu.hugegraph.computer.k8s.util.KubeUtil;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
-import org.apache.hugegraph.config.HugeConfig;
-import org.apache.hugegraph.testutil.Assert;
-import org.apache.hugegraph.testutil.Whitebox;
import io.fabric8.kubernetes.api.model.NamedCluster;
import io.fabric8.kubernetes.api.model.NamedClusterBuilder;
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java
index c3af273c..95d1590a 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java
@@ -36,15 +36,19 @@ import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hugegraph.config.RpcOptions;
+import org.apache.hugegraph.testutil.Assert;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.apache.hugegraph.util.Log;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
+import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.driver.DefaultJobState;
import com.baidu.hugegraph.computer.driver.JobObserver;
import com.baidu.hugegraph.computer.driver.JobStatus;
-import com.baidu.hugegraph.computer.driver.config.ComputerOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeDriverOptions;
import com.baidu.hugegraph.computer.k8s.config.KubeSpecOptions;
import com.baidu.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
@@ -52,9 +56,6 @@ import com.baidu.hugegraph.computer.k8s.driver.KubernetesDriver;
import com.baidu.hugegraph.computer.k8s.operator.common.AbstractController;
import com.baidu.hugegraph.computer.k8s.util.KubeUtil;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
-import org.apache.hugegraph.testutil.Assert;
-import org.apache.hugegraph.testutil.Whitebox;
-import org.apache.hugegraph.util.Log;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -109,7 +110,7 @@ public class MiniKubeTest extends AbstractK8sTest {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "1");
params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(), "0");
- params.put(ComputerOptions.RPC_SERVER_PORT.name(), "0");
+ params.put(RpcOptions.RPC_SERVER_PORT.name(), "0");
String jobId = this.driver.submitJob(ALGORITHM_NAME, params);
JobObserver jobObserver = Mockito.mock(JobObserver.class);
@@ -140,7 +141,7 @@ public class MiniKubeTest extends AbstractK8sTest {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "1");
params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(), "0");
- params.put(ComputerOptions.RPC_SERVER_PORT.name(), "0");
+ params.put(RpcOptions.RPC_SERVER_PORT.name(), "0");
String jobId = this.driver.submitJob("algorithm123", params);
JobObserver jobObserver = Mockito.mock(JobObserver.class);
@@ -326,7 +327,7 @@ public class MiniKubeTest extends AbstractK8sTest {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "1");
params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(), "0");
- params.put(ComputerOptions.RPC_SERVER_PORT.name(), "0");
+ params.put(RpcOptions.RPC_SERVER_PORT.name(), "0");
String jobId = this.driver.submitJob(ALGORITHM_NAME, params);
JobObserver jobObserver = Mockito.mock(JobObserver.class);
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java
index 27d48d0f..fd904a33 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java
@@ -68,7 +68,6 @@ public class UnitTestBase {
private static final String CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"0123456789" +
"abcdefghijklmnopqrstuvxyz";
-
private static String URL;
private static String GRAPH;
private static HugeClient CLIENT = null;
@@ -78,7 +77,7 @@ public class UnitTestBase {
}
@BeforeClass
- public static void step() throws ClassNotFoundException {
+ public static void init() throws ClassNotFoundException {
Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown));
LOG.info("Setup for UnitTestSuite of hugegraph-computer");
diff --git a/pom.xml b/pom.xml
index 1498e0e4..7707774e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
<shell-executable>bash</shell-executable>
<etcd.version>0.5.4</etcd.version>
<hadoop-version>3.1.2</hadoop-version>
+ <commons-lang3-version>3.12.0</commons-lang3-version>
</properties>
<modules>
@@ -197,6 +198,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>spark-core_2.12</artifactId>
+ <groupId>org.apache.spark</groupId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>