You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/19 21:31:32 UTC
tinkerpop git commit: came up with a much cleaner GiraphGraphComputer
usage around workers.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 9d12dae58 -> 2cb9258d4
came up with a much cleaner GiraphGraphComputer usage around workers.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2cb9258d
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2cb9258d
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2cb9258d
Branch: refs/heads/TINKERPOP-1564
Commit: 2cb9258d4d6a1a222f3cbbe8a90c70a7c2e4acc4
Parents: 9d12dae
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 14:31:29 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 19 14:31:29 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 44 +++++++++-----------
1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2cb9258d/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 7d3e236..21c8877 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -57,7 +57,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -79,26 +78,14 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
private MapMemory memory = new MapMemory();
- private boolean useWorkerThreadsInConfiguration;
private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
- this(hadoopGraph.configuration());
+ this(hadoopGraph.configuration());
}
private GiraphGraphComputer(final Configuration configuration) {
super(configuration);
- this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
- this.giraphConfiguration.setVertexClass(GiraphVertex.class);
- this.giraphConfiguration.setComputationClass(GiraphComputation.class);
- this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
- this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
- this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
- this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
- this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
- this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
- this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
- this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
}
public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
@@ -106,14 +93,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
@Override
- public Future<ComputerResult> submit(final Graph graph) {
- final Configuration configuration = graph.configuration();
- this.configuration.copy(configuration);
- configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
- return this.submit();
- }
-
- @Override
public GraphComputer program(final VertexProgram vertexProgram) {
super.program(vertexProgram);
this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -127,6 +106,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
}
@Override
+ public GraphComputer workers(final int workers) {
+ this.configuration.clearProperty(GiraphConstants.MAX_WORKERS);
+ this.configuration.clearProperty(GiraphConstants.NUM_COMPUTE_THREADS.getKey());
+ return super.workers(workers);
+ }
+
+ @Override
public Future<ComputerResult> submit() {
super.validateStatePriorToExecution();
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
@@ -135,7 +121,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
private Future<ComputerResult> submitWithExecutor(final Executor exec) {
final long startTime = System.currentTimeMillis();
this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
- this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
+ this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+ this.giraphConfiguration.setVertexClass(GiraphVertex.class);
+ this.giraphConfiguration.setComputationClass(GiraphComputation.class);
+ this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+ this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+ this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
+ this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
+ this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+ this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+ this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
ConfigurationUtils.copy(this.configuration, apacheConfiguration);
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -179,7 +174,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
if (!this.vertexProgram.getMessageCombiner().isPresent())
this.giraphConfiguration.unset(GiraphConstants.MESSAGE_COMBINER_CLASS.getKey());
// split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers)
- if (!this.useWorkerThreadsInConfiguration) {
+ if (!(this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 ||
+ this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666)) {
final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
int totalMappers = cluster.getClusterStatus().getMapSlotCapacity() - 1; // 1 is needed for master
cluster.close();