You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:13 UTC
[18/20] incubator-tinkerpop git commit: fixed an iteration offset bug
in SparkGraphComputer.
fixed an iteration offset bug in SparkGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3b3ddb4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3b3ddb4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3b3ddb4d
Branch: refs/heads/master
Commit: 3b3ddb4d4a8f6aba0e93cb7c725067c15feab92a
Parents: 406dd68
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 07:24:49 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 07:24:49 2015 -0700
----------------------------------------------------------------------
.../tinkerpop/gremlin/hadoop/Constants.java | 2 ++
.../computer/spark/SparkGraphComputer.java | 22 ++++++++++++++------
.../process/computer/spark/SparkMemory.java | 15 ++++++++-----
.../process/computer/TinkerGraphComputer.java | 4 ++--
4 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 60ef636..697cab0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -49,6 +49,8 @@ public class Constants {
public static final String GREMLIN_HADOOP_HALT = "gremlin.hadoop.halt";
public static final String MAP_MEMORY = "gremlin.hadoop.mapMemory";
+ public static final String MAPRED_INPUT_DIR = "mapred.input.dir";
+
public static final String SEQUENCE_WARNING = "The " + Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT
+ " is not " + SequenceFileOutputFormat.class.getCanonicalName()
+ " and thus, graph computer memory can not be converted to Java objects";
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index c089c57..6df82e4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -129,7 +129,7 @@ public final class SparkGraphComputer implements GraphComputer {
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
+ hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
///
@@ -143,18 +143,25 @@ public final class SparkGraphComputer implements GraphComputer {
// set up the vertex program and wire up configurations
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
- this.vertexProgram.setup(memory);
+ this.vertexProgram.setup(memory); // TODO: setup variables are not being broadcasted on first call
final SApacheConfiguration vertexProgramConfiguration = new SApacheConfiguration();
this.vertexProgram.storeState(vertexProgramConfiguration);
ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
// execute the vertex program
- do {
+ while (true) {
+ memory.setInTask(true);
graphRDD = SparkHelper.executeStep(graphRDD, this.vertexProgram, memory, vertexProgramConfiguration);
graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd
graphRDD.cache(); // TODO: learn about persistence and caching
- memory.incrIteration();
- } while (!this.vertexProgram.terminate(memory));
+ memory.setInTask(false);
+ if (this.vertexProgram.terminate(memory)) {
+ memory.incrIteration();
+ break;
+ } else
+ memory.incrIteration();
+ }
// write the output graph back to disk
SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
@@ -175,7 +182,10 @@ public final class SparkGraphComputer implements GraphComputer {
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
+ hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, null == this.vertexProgram ?
+ hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
+ hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
+
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
// execute the map reduce job
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index 402f2d3..b21f752 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -46,6 +46,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
private final AtomicInteger iteration = new AtomicInteger(0);
private final AtomicLong runtime = new AtomicLong(0l);
private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+ private boolean inTask = false;
public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
if (null != vertexProgram) {
@@ -104,7 +105,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
@Override
public <R> R get(final String key) throws IllegalArgumentException {
- final R r = (R) this.memory.get(key).value().object;
+ final R r = (R) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object;
if (null == r)
throw Memory.Exceptions.memoryDoesNotExist(key);
else
@@ -115,27 +116,27 @@ public final class SparkMemory implements Memory.Admin, Serializable {
public long incr(final String key, final long delta) {
checkKeyValue(key, delta);
this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
- return (Long) this.memory.get(key).localValue().object + delta;
+ return (Long) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object + delta;
}
@Override
public boolean and(final String key, final boolean bool) {
checkKeyValue(key, bool);
this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
- return (Boolean) this.memory.get(key).localValue().object && bool;
+ return (Boolean) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object && bool;
}
@Override
public boolean or(final String key, final boolean bool) {
checkKeyValue(key, bool);
this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
- return (Boolean) this.memory.get(key).localValue().object || bool;
+ return (Boolean) (this.inTask ? this.memory.get(key).localValue() : this.memory.get(key).value()).object || bool;
}
@Override
public void set(final String key, final Object value) {
checkKeyValue(key, value);
- this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+ this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
}
@Override
@@ -143,6 +144,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
return StringFactory.memoryString(this);
}
+ public void setInTask(final boolean inTask) {
+ this.inTask = inTask;
+ }
+
private void checkKeyValue(final String key, final Object value) {
if (!this.memoryKeys.contains(key))
throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3b3ddb4d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 4132dd9..013bd36 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -125,8 +125,8 @@ public class TinkerGraphComputer implements GraphComputer {
this.memory.completeSubRound();
}
}
- } catch (Exception ex) {
- throw new RuntimeException(ex);
+ } catch (final Exception ex) {
+ throw new IllegalStateException(ex.getMessage(), ex);
}
}