You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/19 19:51:48 UTC

tinkerpop git commit: got new GraphComputer.configuration() model working for SparkGraphComputer and GiraphGraphComputer.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 70b0b576f -> 9d12dae58


got new GraphComputer.configuration() model working for SparkGraphComputer and GiraphGraphComputer.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9d12dae5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9d12dae5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9d12dae5

Branch: refs/heads/TINKERPOP-1564
Commit: 9d12dae58d0c7d0d46e857c6f2477310e7d2758e
Parents: 70b0b57
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 19 12:51:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 19 12:51:41 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 37 +++++++-------------
 .../computer/AbstractHadoopGraphComputer.java   | 33 +++++++++++++----
 .../process/computer/SparkGraphComputer.java    | 28 +++++----------
 3 files changed, 48 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index f4c7958..7d3e236 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -83,9 +83,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
-        super(hadoopGraph);
-        final Configuration configuration = hadoopGraph.configuration();
-        configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
+       this(hadoopGraph.configuration());
+    }
+
+    private GiraphGraphComputer(final Configuration configuration) {
+        super(configuration);
         this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
         this.giraphConfiguration.setVertexClass(GiraphVertex.class);
         this.giraphConfiguration.setComputationClass(GiraphComputation.class);
@@ -100,31 +102,18 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
-        return HadoopGraph.open(configuration).compute(GiraphGraphComputer.class);
+        return new GiraphGraphComputer(configuration);
     }
 
     @Override
     public Future<ComputerResult> submit(final Graph graph) {
-        this.hadoopGraph = (HadoopGraph)graph;
-        final Configuration configuration = this.hadoopGraph.configuration();
+        final Configuration configuration = graph.configuration();
+        this.configuration.copy(configuration);
         configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
         return this.submit();
     }
 
     @Override
-    public GraphComputer workers(final int workers) {
-        this.useWorkerThreadsInConfiguration = false;
-        return super.workers(workers);
-    }
-
-    @Override
-    public GraphComputer configure(final String key, final Object value) {
-        this.giraphConfiguration.set(key, value.toString());
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
-        return this;
-    }
-
-    @Override
     public GraphComputer program(final VertexProgram vertexProgram) {
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -143,14 +132,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
     }
 
-    @Override
-    public org.apache.commons.configuration.Configuration configuration() {
-        return ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
-    }
-
     private Future<ComputerResult> submitWithExecutor(final Executor exec) {
         final long startTime = System.currentTimeMillis();
+        this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString()));
+        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
+        ConfigurationUtils.copy(this.configuration, apacheConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             try {
                 this.loadJars(giraphConfiguration);
@@ -183,7 +170,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             if (null != this.vertexProgram) {
                 // a way to verify in Giraph whether the traversal will go over the wire or not
                 try {
-                    VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
+                    VertexProgram.createVertexProgram(HadoopGraph.open(this.configuration), ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
                 } catch (final IllegalStateException e) {
                     if (e.getCause() instanceof NumberFormatException)
                         throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 344f04e..b95fb7e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -63,7 +64,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
             Pattern.compile(File.pathSeparator.equals(":") ? "([^:]|://)+" : ("[^" + File.pathSeparator + "]"));
 
     protected final Logger logger;
-    protected HadoopGraph hadoopGraph;
+    protected HadoopConfiguration configuration;
     protected boolean executed = false;
     protected final Set<MapReduce> mapReducers = new HashSet<>();
     protected VertexProgram<Object> vertexProgram;
@@ -75,32 +76,40 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     protected GraphFilter graphFilter = new GraphFilter();
 
     public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) {
-        this.hadoopGraph = hadoopGraph;
+        this(hadoopGraph.configuration());
+    }
+
+    protected AbstractHadoopGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+        this.configuration = new HadoopConfiguration(configuration);
         this.logger = LoggerFactory.getLogger(this.getClass());
-        //GraphComputerHelper.configure(this, this.hadoopGraph.configuration());
+        GraphComputerHelper.configure(this, this.configuration);
     }
 
     @Override
     public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
         this.graphFilter.setVertexFilter(vertexFilter);
+        this.configuration.setProperty(VERTICES, vertexFilter);
         return this;
     }
 
     @Override
     public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
         this.graphFilter.setEdgeFilter(edgeFilter);
+        this.configuration.setProperty(EDGES, edgeFilter);
         return this;
     }
 
     @Override
     public GraphComputer result(final ResultGraph resultGraph) {
         this.resultGraph = resultGraph;
+        this.configuration.setProperty(RESULT, resultGraph.name());
         return this;
     }
 
     @Override
     public GraphComputer persist(final Persist persist) {
         this.persist = persist;
+        this.configuration.setProperty(PERSIST, persist.name());
         return this;
     }
 
@@ -119,16 +128,28 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
     @Override
     public GraphComputer workers(final int workers) {
         this.workers = workers;
+        this.configuration.setProperty(WORKERS, workers);
         return this;
     }
 
     @Override
     public Future<ComputerResult> submit(final Graph graph) {
-        this.hadoopGraph = (HadoopGraph) graph;
+        ConfigurationUtils.copy(graph.configuration(), this.configuration);
         return this.submit();
     }
 
     @Override
+    public GraphComputer configure(final String key, final Object value) {
+        this.configuration.setProperty(key,value);
+        return this;
+    }
+
+    @Override
+    public org.apache.commons.configuration.Configuration configuration() {
+        return this.configuration;
+    }
+
+    @Override
     public String toString() {
         return StringFactory.graphComputerString(this);
     }
@@ -239,8 +260,8 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
 
         @Override
         public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
-            if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
-                final Object writer = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphWriter(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration()));
+            if (configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) {
+                final Object writer = ReflectionUtils.newInstance(configuration.getGraphWriter(), ConfUtil.makeHadoopConfiguration(configuration));
                 if (writer instanceof PersistResultGraphAware)
                     return ((PersistResultGraphAware) writer).supportsResultGraphPersistCombination(resultGraph, persist);
                 else {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9d12dae5/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 59497e0..c72454b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -86,7 +86,6 @@ import java.util.concurrent.ThreadFactory;
  */
 public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
-    private final org.apache.commons.configuration.Configuration sparkConfiguration;
     private boolean workersSet = false;
     private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
 
@@ -105,15 +104,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
     public SparkGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
-        this.sparkConfiguration = new HadoopConfiguration();
-        ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+    }
+
+    private SparkGraphComputer(final org.apache.commons.configuration.Configuration configuration) {
+        super(configuration);
     }
 
     @Override
     public GraphComputer workers(final int workers) {
         super.workers(workers);
-        if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
-            this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
+        if (this.configuration.containsKey(SparkLauncher.SPARK_MASTER) && this.configuration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
+            this.configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
         }
         this.workersSet = true;
         return this;
@@ -121,7 +122,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
     @Override
     public GraphComputer configure(final String key, final Object value) {
-        this.sparkConfiguration.setProperty(key, value);
+        this.configuration.setProperty(key, value);
         return this;
     }
 
@@ -131,19 +132,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
     }
 
-    @Override
-    public Future<ComputerResult> submit(final Graph graph) {
-        ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration);
-        return this.submit();
-    }
-
-    @Override
-    public org.apache.commons.configuration.Configuration configuration() {
-        return new HadoopConfiguration(this.sparkConfiguration);
-    }
-
     public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) {
-        return new SparkGraphComputer(HadoopGraph.open(configuration));
+        return new SparkGraphComputer(configuration);
     }
 
     private Future<ComputerResult> submitWithExecutor(Executor exec) {
@@ -151,7 +141,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         return computerService.submit(() -> {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the graph computer computation
-            final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
+            final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.configuration);
             if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
                 graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
             graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));