You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:13 UTC

[18/20] incubator-tinkerpop git commit: fixed an iteration offset bug in SparkGraphComputer.

fixed an iteration offset bug in SparkGraphComputer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3b3ddb4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3b3ddb4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3b3ddb4d

Branch: refs/heads/master
Commit: 3b3ddb4d4a8f6aba0e93cb7c725067c15feab92a
Parents: 406dd68
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 07:24:49 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 07:24:49 2015 -0700

----------------------------------------------------------------------
 .../tinkerpop/gremlin/hadoop/Constants.java     |  2 ++
 .../computer/spark/SparkGraphComputer.java      | 22 ++++++++++++++------
 .../process/computer/spark/SparkMemory.java     | 15 ++++++++-----
 .../process/computer/TinkerGraphComputer.java   |  4 ++--
 4 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 60ef636..697cab0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -49,6 +49,8 @@ public class Constants {
     public static final String GREMLIN_HADOOP_HALT = "gremlin.hadoop.halt";
     public static final String MAP_MEMORY = "gremlin.hadoop.mapMemory";
 
+    public static final String MAPRED_INPUT_DIR = "mapred.input.dir";
+
     public static final String SEQUENCE_WARNING = "The " + Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT
             + " is not " + SequenceFileOutputFormat.class.getCanonicalName()
             + " and thus, graph computer memory can not be converted to Java objects";

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index c089c57..6df82e4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -129,7 +129,7 @@ public final class SparkGraphComputer implements GraphComputer {
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
+                            hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         ///
@@ -143,18 +143,25 @@ public final class SparkGraphComputer implements GraphComputer {
 
                             // set up the vertex program and wire up configurations
                             memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
-                            this.vertexProgram.setup(memory);
+                            this.vertexProgram.setup(memory); // TODO: setup variables are not being broadcasted on first call
                             final SApacheConfiguration vertexProgramConfiguration = new SApacheConfiguration();
                             this.vertexProgram.storeState(vertexProgramConfiguration);
                             ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
                             ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
                             // execute the vertex program
-                            do {
+                            while (true) {
+                                memory.setInTask(true);
                                 graphRDD = SparkHelper.executeStep(graphRDD, this.vertexProgram, memory, vertexProgramConfiguration);
                                 graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd
                                 graphRDD.cache(); // TODO: learn about persistence and caching
-                                memory.incrIteration();
-                            } while (!this.vertexProgram.terminate(memory));
+                                memory.setInTask(false);
+                                if (this.vertexProgram.terminate(memory)) {
+                                    memory.incrIteration();
+                                    break;
+                                } else
+                                    memory.incrIteration();
+                            }
 
                             // write the output graph back to disk
                             SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
@@ -175,7 +182,10 @@ public final class SparkGraphComputer implements GraphComputer {
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
+                            hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, null == this.vertexProgram ?
+                                    hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
+                                    hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
+
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         // execute the map reduce job

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index 402f2d3..b21f752 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -46,6 +46,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     private final AtomicInteger iteration = new AtomicInteger(0);
     private final AtomicLong runtime = new AtomicLong(0l);
     private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+    private boolean inTask = false;
 
     public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
         if (null != vertexProgram) {
@@ -104,7 +105,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
-        final R r = (R) this.memory.get(key).value().object;
+        final R r = (R) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object;
         if (null == r)
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
@@ -115,27 +116,27 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     public long incr(final String key, final long delta) {
         checkKeyValue(key, delta);
         this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
-        return (Long) this.memory.get(key).localValue().object + delta;
+        return (Long) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object + delta;
     }
 
     @Override
     public boolean and(final String key, final boolean bool) {
         checkKeyValue(key, bool);
         this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
-        return (Boolean) this.memory.get(key).localValue().object && bool;
+        return (Boolean) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object && bool;
     }
 
     @Override
     public boolean or(final String key, final boolean bool) {
         checkKeyValue(key, bool);
         this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
-        return (Boolean) this.memory.get(key).localValue().object || bool;
+        return (Boolean) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object || bool;
     }
 
     @Override
     public void set(final String key, final Object value) {
         checkKeyValue(key, value);
-        this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+        this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
     }
 
     @Override
@@ -143,6 +144,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         return StringFactory.memoryString(this);
     }
 
+    public void setInTask(final boolean inTask) {
+        this.inTask = inTask;
+    }
+
     private void checkKeyValue(final String key, final Object value) {
         if (!this.memoryKeys.contains(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 4132dd9..013bd36 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -125,8 +125,8 @@ public class TinkerGraphComputer implements GraphComputer {
                             this.memory.completeSubRound();
                         }
                     }
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
+                } catch (final Exception ex) {
+                    throw new IllegalStateException(ex.getMessage(), ex);
                 }
             }