You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/04 13:59:04 UTC

[46/50] [abbrv] tinkerpop git commit: came up with a much cleaner GiraphGraphComputer usage around workers.

came up with a much cleaner GiraphGraphComputer usage around workers.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ef2fabdf
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ef2fabdf
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ef2fabdf

Branch: refs/heads/TINKERPOP-1564
Commit: ef2fabdf9066ef3188834a0c37491434e4975c84
Parents: c580fa2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 14:31:29 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 4 05:15:19 2017 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ef2fabdf/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index db4d6da..b316220 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -58,7 +58,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
@@ -81,26 +80,14 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
 
     protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
     private MapMemory memory = new MapMemory();
-    private boolean useWorkerThreadsInConfiguration;
     private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
-       this(hadoopGraph.configuration());
+        this(hadoopGraph.configuration());
     }
 
     private GiraphGraphComputer(final Configuration configuration) {
         super(configuration);
-        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
-        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
-        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
-        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
-        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
-        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
-        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
-        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
     }
 
     public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
@@ -108,14 +95,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
-    public Future<ComputerResult> submit(final Graph graph) {
-        final Configuration configuration = graph.configuration();
-        this.configuration.copy(configuration);
-        configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
-        return this.submit();
-    }
-
-    @Override
     public GraphComputer program(final VertexProgram vertexProgram) {
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -129,6 +108,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.configuration.clearProperty(GiraphConstants.MAX_WORKERS);
+        this.configuration.clearProperty(GiraphConstants.NUM_COMPUTE_THREADS.getKey());
+        return super.workers(workers);
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
@@ -137,7 +123,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     private Future<ComputerResult> submitWithExecutor(final Executor exec) {
         final long startTime = System.currentTimeMillis();
         this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
+        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
+        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
+        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
+        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         ConfigurationUtils.copy(this.configuration, apacheConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -181,7 +176,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 if (!this.vertexProgram.getMessageCombiner().isPresent())
                     this.giraphConfiguration.unset(GiraphConstants.MESSAGE_COMBINER_CLASS.getKey());
                 // split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers)
-                if (!this.useWorkerThreadsInConfiguration) {
+                if (!(this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 ||
+                        this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666)) {
                     final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
                     int totalMappers = cluster.getClusterStatus().getMapSlotCapacity() - 1; // 1 is needed for master
                     cluster.close();