You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/19 21:31:32 UTC

tinkerpop git commit: came up with a much cleaner GiraphGraphComputer usage around workers.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 9d12dae58 -> 2cb9258d4


came up with a much cleaner GiraphGraphComputer usage around workers.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2cb9258d
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2cb9258d
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2cb9258d

Branch: refs/heads/TINKERPOP-1564
Commit: 2cb9258d4d6a1a222f3cbbe8a90c70a7c2e4acc4
Parents: 9d12dae
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 14:31:29 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 19 14:31:29 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2cb9258d/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 7d3e236..21c8877 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -57,7 +57,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -79,26 +78,14 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
 
     protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
     private MapMemory memory = new MapMemory();
-    private boolean useWorkerThreadsInConfiguration;
     private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
-       this(hadoopGraph.configuration());
+        this(hadoopGraph.configuration());
     }
 
     private GiraphGraphComputer(final Configuration configuration) {
         super(configuration);
-        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
-        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
-        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
-        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
-        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
-        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
-        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
-        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
     }
 
     public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
@@ -106,14 +93,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
-    public Future<ComputerResult> submit(final Graph graph) {
-        final Configuration configuration = graph.configuration();
-        this.configuration.copy(configuration);
-        configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
-        return this.submit();
-    }
-
-    @Override
     public GraphComputer program(final VertexProgram vertexProgram) {
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -127,6 +106,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.configuration.clearProperty(GiraphConstants.MAX_WORKERS);
+        this.configuration.clearProperty(GiraphConstants.NUM_COMPUTE_THREADS.getKey());
+        return super.workers(workers);
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
@@ -135,7 +121,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     private Future<ComputerResult> submitWithExecutor(final Executor exec) {
         final long startTime = System.currentTimeMillis();
         this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
+        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
+        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
+        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
+        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         ConfigurationUtils.copy(this.configuration, apacheConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -179,7 +174,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 if (!this.vertexProgram.getMessageCombiner().isPresent())
                     this.giraphConfiguration.unset(GiraphConstants.MESSAGE_COMBINER_CLASS.getKey());
                 // split required workers across system (open map slots + max threads per machine = total amount of TinkerPop workers)
-                if (!this.useWorkerThreadsInConfiguration) {
+                if (!(this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 ||
+                        this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666)) {
                     final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
                     int totalMappers = cluster.getClusterStatus().getMapSlotCapacity() - 1; // 1 is needed for master
                     cluster.close();