You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/05/27 19:58:05 UTC

[1/2] incubator-tinkerpop git commit: Added OutputRDD for Spark. TinkerGraphView now tested with dropView() within GraphComputerTest. GraphComputerTests now test the meaning of ResultGraph and Persist.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 057503e69 -> ec9365a21


Added OutputRDD for Spark. TinkerGraphView now tested with dropView() within GraphComputerTest. GraphComputerTests now test the meaning of ResultGraph and Persist.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8a027df7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8a027df7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8a027df7

Branch: refs/heads/master
Commit: 8a027df7ab3827232f63f875ea189e52e3d2beca
Parents: 057503e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 27 10:00:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 27 10:00:10 2015 -0600

----------------------------------------------------------------------
 docs/src/the-traversal.asciidoc                 |   2 +-
 .../process/computer/GraphComputerTest.java     |  95 +++++++++-
 .../tinkerpop/gremlin/hadoop/Constants.java     |   1 +
 .../process/computer/spark/SparkExecutor.java   |  49 -----
 .../computer/spark/SparkGraphComputer.java      | 178 +++++++++++--------
 .../computer/spark/io/OutputFormatRDD.java      |  51 ++++++
 .../process/computer/spark/io/OutputRDD.java    |  34 ++++
 .../process/computer/TinkerGraphComputer.java   |  10 +-
 .../process/computer/TinkerGraphView.java       |  17 +-
 .../tinkergraph/structure/TinkerGraph.java      |   5 +-
 .../tinkergraph/structure/TinkerHelper.java     |  11 +-
 11 files changed, 306 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/docs/src/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/the-traversal.asciidoc b/docs/src/the-traversal.asciidoc
index d81a364..d9ffa6f 100644
--- a/docs/src/the-traversal.asciidoc
+++ b/docs/src/the-traversal.asciidoc
@@ -1359,7 +1359,7 @@ It is important to see how the paths of all the emanating traversers are united
 
 image::tree-step2.png[width=500]
 
-The resultant tree data structure can then be manipulated (see link:http://www.tinkerpop.com/javadocs/current/org/apache/tinkerpop/gremlin/process/graph/step/util/Tree.html[Tree JavaDoc]). For the sake of demonstration, a post-processing lambda is applied in the running example below.
+The resultant tree data structure can then be manipulated (see `Tree` JavaDoc).
 
 [gremlin-groovy,modern]
 ----

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index c1dc796..a364ed0 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -489,7 +488,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public GraphComputer.Persist getPreferredPersist() {
-            return GraphComputer.Persist.EDGES;
+            return GraphComputer.Persist.NOTHING;
         }
     }
     /////////////////////////////////////////////
@@ -994,7 +993,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         MapReduceI.TIME_KEEPER.set(-1l);
         MapReduceI.WORKER_START.clear();
         MapReduceI.WORKER_END.clear();
-        assertEquals(3,graph.compute(graphComputerClass.get()).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
+        assertEquals(3, graph.compute(graphComputerClass.get()).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
         assertEquals(Long.MIN_VALUE, VertexProgramJ.TIME_KEEPER.get());
         if (MapReduceI.WORKER_START.size() == 2) {
             assertEquals(2, MapReduceI.WORKER_START.size());
@@ -1066,11 +1065,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return Collections.emptySet();
-        }
-
-        @Override
         public Set<String> getMemoryComputeKeys() {
             return new HashSet<>(Arrays.asList("test"));
         }
@@ -1189,4 +1183,89 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
     }
 
+    /////////////////////////////////////////////
+
+    /////////////////////////////////////////////
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldWriteNewGraphResultWithEdges() throws Exception {
+        final GraphComputer computer = graph.compute(graphComputerClass.get());
+        if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.EDGES)) {
+            final ComputerResult result = computer.program(new VertexProgramK()).submit().get();
+            assertEquals(Double.valueOf(28.0d), result.graph().traversal().V().values("money").sum().next());
+            assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
+            assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
+        }
+    }
+
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldWriteNewGraphResultWithVertexProperties() throws Exception {
+        final GraphComputer computer = graph.compute(graphComputerClass.get());
+        if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.VERTEX_PROPERTIES)) {
+            final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
+            assertEquals(Double.valueOf(28.0d), result.graph().traversal().V().values("money").sum().next());
+            assertEquals(Long.valueOf(0l), result.graph().traversal().E().count().next());
+            assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
+        }
+    }
+
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldWriteOriginalGraphResultWithVertexProperties() throws Exception {
+        final GraphComputer computer = graph.compute(graphComputerClass.get());
+        if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.VERTEX_PROPERTIES)) {
+            final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
+            assertEquals(Double.valueOf(28.0d), graph.traversal().V().values("money").sum().next());
+            assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
+            assertEquals(Long.valueOf(18l), graph.traversal().V().values().count().next());
+            ///
+            assertEquals(Double.valueOf(28.0d), result.graph().traversal().V().values("money").sum().next());
+            assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
+            result.graph().traversal().V().valueMap().forEachRemaining(System.out::println);
+            assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
+        }
+    }
+
+    public static class VertexProgramK extends StaticVertexProgram {
+
+
+        @Override
+        public void setup(final Memory memory) {
+
+        }
+
+        @Override
+        public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+            vertex.property("money", vertex.<String>value("name").length());
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            return true;
+        }
+
+        @Override
+        public Set<String> getElementComputeKeys() {
+            return Collections.singleton("money");
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(Memory memory) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.EDGES;
+        }
+    }
+
+    /////////////////////////////////////////////
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 06885a5..b2e1b0a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -59,6 +59,7 @@ public final class Constants {
 
     // spark based constants
     public static final String GREMLIN_HADOOP_INPUT_RDD = "gremlin.hadoop.inputRDD";
+    public static final String GREMLIN_HADOOP_OUTPUT_RDD = "gremlin.hadoop.outputRDD";
 
     public static final String SEQUENCE_WARNING = "The " + Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT
             + " is not " + SequenceFileOutputFormat.class.getCanonicalName()

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index d0297b5..783cf64 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -20,10 +20,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
 import com.google.common.base.Optional;
 import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -44,8 +41,6 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -208,37 +203,6 @@ public final class SparkExecutor {
     // Input/Output //
     //////////////////
 
-    public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-        if (null != outputLocation) {
-            try {
-                FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
-
-    public static String getInputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        try {
-            return FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    public static void saveGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-
     public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
         final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         if (null != outputLocation) {
@@ -259,17 +223,4 @@ public final class SparkExecutor {
             }
         }
     }
-
-    /*public static void saveMemory(final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            try {
-                final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(hadoopConfiguration), hadoopConfiguration, new Path(outputLocation + "/" + Constants.HIDDEN_MEMORY), ObjectWritable.class, ObjectWritable.class);
-                writer.append(new ObjectWritable<>(MapReduce.NullObject.instance()), new ObjectWritable<>(new MapMemory(memory)));
-                writer.close();
-            } catch (final Exception e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }*/
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 1a097ff..b6f6968 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -22,6 +22,8 @@ import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.FileConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
@@ -31,6 +33,8 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputFormatRDD;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputRDD;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -45,6 +49,7 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
@@ -61,25 +66,36 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
-        // apache and hadoop configurations that are used throughout
+        // apache and hadoop configurations that are used throughout the graph computer computation
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.get().equals(Persist.EDGES));
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-            final String inputLocation = SparkExecutor.getInputLocation(hadoopConfiguration);
-            apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
-            hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
+            try {
+                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
+                hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
         }
 
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
-                    final long startTime = System.currentTimeMillis();
-                    SparkMemory memory = null;
-                    SparkExecutor.deleteOutputLocation(hadoopConfiguration);
-
-                    // wire up a spark context
-                    final SparkConf sparkConfiguration = new SparkConf();
-                    sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
+            final long startTime = System.currentTimeMillis();
+            SparkMemory memory = null;
+            // delete output location
+            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+            if (null != outputLocation) {
+                try {
+                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
+            // wire up a spark context
+            final SparkConf sparkConfiguration = new SparkConf();
+            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
                     /*final List<Class> classes = new ArrayList<>();
                     classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
                     classes.addAll(IOClasses.getSharedHadoopClasses());
@@ -89,82 +105,88 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     classes.add(ViewOutgoingPayload.class);
                     sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
 
-                    hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-                    // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
-                    try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
-                        // add the project jars to the cluster
-                        this.loadJars(sparkContext, hadoopConfiguration);
-                        // create a message-passing friendly rdd from the hadoop input format
-                        final JavaPairRDD<Object, VertexWritable> graphRDD;
+            // create the spark configuration from the graph computer configuration
+            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+            try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                // add the project jars to the cluster
+                this.loadJars(sparkContext, hadoopConfiguration);
+                // create a message-passing friendly rdd from the input rdd
+                final JavaPairRDD<Object, VertexWritable> graphRDD;
+                try {
+                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            .newInstance()
+                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .setName("graphRDD")
+                            .cache();
+                } catch (final InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+
+                ////////////////////////////////
+                // process the vertex program //
+                ////////////////////////////////
+                if (null != this.vertexProgram) {
+                    // set up the vertex program and wire up configurations
+                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
+                    this.vertexProgram.setup(memory);
+                    memory.broadcastMemory(sparkContext);
+                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+                    this.vertexProgram.storeState(vertexProgramConfiguration);
+                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
+                    // execute the vertex program
+                    while (true) {
+                        memory.setInTask(true);
+                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                        memory.setInTask(false);
+                        if (this.vertexProgram.terminate(memory))
+                            break;
+                        else {
+                            memory.incrIteration();
+                            memory.broadcastMemory(sparkContext);
+                        }
+                    }
+                    // write the graph rdd using the output rdd
+                    if (!this.persist.get().equals(Persist.NOTHING)) {
                         try {
-                            graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
-                                    .readGraphRDD(apacheConfiguration, sparkContext)
-                                    .setName("graphRDD")
-                                    .cache();
+                                    .writeGraphRDD(apacheConfiguration, graphRDD);
                         } catch (final InstantiationException | IllegalAccessException e) {
                             throw new IllegalStateException(e.getMessage(), e);
                         }
-                        JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
-
-                        ////////////////////////////////
-                        // process the vertex program //
-                        ////////////////////////////////
-                        if (null != this.vertexProgram) {
-                            // set up the vertex program and wire up configurations
-                            memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
-                            this.vertexProgram.setup(memory);
-                            memory.broadcastMemory(sparkContext);
-                            final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
-                            this.vertexProgram.storeState(vertexProgramConfiguration);
-                            ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
-                            ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
-                            // execute the vertex program
-                            while (true) {
-                                memory.setInTask(true);
-                                viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
-                                memory.setInTask(false);
-                                if (this.vertexProgram.terminate(memory))
-                                    break;
-                                else {
-                                    memory.incrIteration();
-                                    memory.broadcastMemory(sparkContext);
-                                }
-                            }
-                            // write the output graph back to disk
-                            if (!this.persist.get().equals(Persist.NOTHING))
-                                SparkExecutor.saveGraphRDD(graphRDD, hadoopConfiguration);
-                        }
+                    }
+                }
 
-                        final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
 
-                        //////////////////////////////
-                        // process the map reducers //
-                        //////////////////////////////
-                        if (!this.mapReducers.isEmpty()) {
-                            final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
-                            for (final MapReduce mapReduce : this.mapReducers) {
-                                // execute the map reduce job
-                                final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
-                                mapReduce.storeState(newApacheConfiguration);
-                                // map
-                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
-                                // combine TODO: is this really needed
-                                // reduce
-                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
-                                // write the map reduce output back to disk (memory)
-                                SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
-                            }
-                        }
-                        // update runtime and return the newly computed graph
-                        finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                        //SparkExecutor.saveMemory(finalMemory, hadoopConfiguration);
-                        return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph.get(), this.persist.get()), finalMemory.asImmutable());
+                //////////////////////////////
+                // process the map reducers //
+                //////////////////////////////
+                if (!this.mapReducers.isEmpty()) {
+                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
+                    for (final MapReduce mapReduce : this.mapReducers) {
+                        // execute the map reduce job
+                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                        mapReduce.storeState(newApacheConfiguration);
+                        // map
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+                        // combine TODO: is this really needed
+                        // reduce
+                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+                        // write the map reduce output back to disk (memory)
+                        SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                     }
                 }
-        );
+                // update runtime and return the newly computed graph
+                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph.get(), this.persist.get()), finalMemory.asImmutable());
+            }
+        });
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
new file mode 100644
index 0000000..f792749
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
@@ -0,0 +1,51 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OutputFormatRDD implements OutputRDD {
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
new file mode 100644
index 0000000..0d7613c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface OutputRDD {
+
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index b92e9c1..f8693b0 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -108,9 +108,8 @@ public final class TinkerGraphComputer implements GraphComputer {
             this.persist = Optional.of(null == this.vertexProgram ? Persist.NOTHING : this.vertexProgram.getPreferredPersist());
         if (!this.resultGraph.isPresent())
             this.resultGraph = Optional.of(null == this.vertexProgram ? ResultGraph.ORIGINAL : this.vertexProgram.getPreferredResultGraph());
-        if (this.resultGraph.get().equals(ResultGraph.ORIGINAL))
-            if (!this.persist.get().equals(Persist.NOTHING))
-                throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph.get(), this.persist.get());
+        if (!this.features().supportsResultGraphPersistCombination(this.resultGraph.get(), this.persist.get()))
+            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph.get(), this.persist.get());
 
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -192,6 +191,9 @@ public final class TinkerGraphComputer implements GraphComputer {
                 // update runtime and return the newly computed graph
                 this.memory.setRuntime(System.currentTimeMillis() - time);
                 this.memory.complete();
+                if (Persist.NOTHING != this.persist.get() && ResultGraph.ORIGINAL == this.resultGraph.get()) {
+                    TinkerHelper.getGraphView(this.graph).addPropertiesToOriginalGraph();
+                }
                 return new TinkerComputerResult(this.graph, this.memory.asImmutable());
 
             } catch (Exception ex) {
@@ -251,7 +253,7 @@ public final class TinkerGraphComputer implements GraphComputer {
             }
 
             public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
-                return persist == Persist.NOTHING || resultGraph != ResultGraph.ORIGINAL;
+                return persist == Persist.NOTHING || (persist != Persist.EDGES && (persist != Persist.VERTEX_PROPERTIES && resultGraph == ResultGraph.NEW));
             }
 
             public boolean supportsDirectObjects() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphView.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphView.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphView.java
index d30b142..607fde6 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphView.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphView.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
@@ -42,10 +43,12 @@ import java.util.stream.Stream;
  */
 public final class TinkerGraphView {
 
+    private final TinkerGraph graph;
     protected final Set<String> computeKeys;
     private Map<Element, Map<String, List<VertexProperty>>> computeProperties;
 
-    public TinkerGraphView(final Set<String> computeKeys) {
+    public TinkerGraphView(final TinkerGraph graph, final Set<String> computeKeys) {
+        this.graph = graph;
         this.computeKeys = computeKeys;
         this.computeProperties = new ConcurrentHashMap<>();
     }
@@ -121,4 +124,16 @@ public final class TinkerGraphView {
     public boolean isComputeKey(final String key) {
         return this.computeKeys.contains(key);
     }
+
+    public void addPropertiesToOriginalGraph() {
+        TinkerHelper.dropGraphView(this.graph);
+        this.computeProperties.forEach((element, properties) -> {
+            properties.forEach((key, vertexProperties) -> {
+                vertexProperties.forEach(vertexProperty -> {
+                    element.property(vertexProperty.key(), vertexProperty.value()); // TODO: meta properties and ids
+                });
+            });
+        });
+        this.computeProperties.clear();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraph.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraph.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraph.java
index 561a69e..f7c87f6 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraph.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerGraph.java
@@ -104,10 +104,6 @@ public final class TinkerGraph implements Graph {
         vertexPropertyIdManager = selectIdManager(configuration, CONFIG_VERTEX_PROPERTY_ID, VertexProperty.class);
     }
 
-    public static TinkerGraph empty() {
-        return EMPTY_GRAPH;
-    }
-
     /**
      * Open a new {@link TinkerGraph} instance.
      * <p/>
@@ -190,6 +186,7 @@ public final class TinkerGraph implements Graph {
         this.currentId.set(-1l);
         this.vertexIndex = null;
         this.edgeIndex = null;
+        this.graphView = null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8a027df7/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
index 21e3beb..a64cb4d 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.tinkergraph.structure;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -102,7 +101,15 @@ public final class TinkerHelper {
     }
 
     public static TinkerGraphView createGraphView(final TinkerGraph graph, final Set<String> computeKeys) {
-        return graph.graphView = new TinkerGraphView(computeKeys);
+        return graph.graphView = new TinkerGraphView(graph, computeKeys);
+    }
+
+    public static TinkerGraphView getGraphView(final TinkerGraph graph) {
+        return graph.graphView;
+    }
+
+    public static void dropGraphView(final TinkerGraph graph) {
+        graph.graphView = null;
     }
 
     public static Map<String, List<VertexProperty>> getProperties(final TinkerVertex vertex) {



[2/2] incubator-tinkerpop git commit: GiraphGraphComputer no longer requires another BSP round to compute its memory. This removed alot of extra code, extra MapReduce, and numerous Constants. Added more tests to GraphComputerTest that verify Persist and

Posted by ok...@apache.org.
GiraphGraphComputer no longer requires another BSP round to compute its memory. This removed alot of extra code, extra MapReduce, and numerous Constants. Added more tests to GraphComputerTest that verify Persist and ResultGraph options.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ec9365a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ec9365a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ec9365a2

Branch: refs/heads/master
Commit: ec9365a21f38fc44f791f40c6949328c55a6f0ca
Parents: 8a027df
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 27 11:58:13 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 27 11:58:13 2015 -0600

----------------------------------------------------------------------
 docs/src/implementations.asciidoc               |   1 -
 .../ranking/PageRankVertexProgramTest.java      |  55 +++++----
 hadoop-gremlin/conf/hadoop-graphson.properties  |   2 -
 hadoop-gremlin/conf/hadoop-gryo.properties      |   1 -
 hadoop-gremlin/conf/hadoop-script.properties    |   1 -
 .../tinkerpop/gremlin/hadoop/Constants.java     |   7 --
 .../computer/giraph/GiraphComputeVertex.java    |  16 +--
 .../computer/giraph/GiraphGraphComputer.java    |  32 ++---
 .../process/computer/giraph/GiraphMemory.java   |  58 ++++++---
 .../computer/giraph/GiraphWorkerContext.java    |   7 --
 .../process/computer/util/MemoryMapReduce.java  | 119 -------------------
 .../gremlin/hadoop/HadoopGraphProvider.java     |   2 +-
 .../process/computer/spark/io/InputRDDTest.java |   2 +-
 13 files changed, 92 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 990a170..5201f00 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -571,7 +571,6 @@ gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
 gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
 gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
 gremlin.hadoop.jarsInDistributedCache=true
 gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.kryo
 gremlin.hadoop.outputLocation=output

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/PageRankVertexProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/PageRankVertexProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/PageRankVertexProgramTest.java
index d4611f1..df65c7b 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/PageRankVertexProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/PageRankVertexProgramTest.java
@@ -20,13 +20,14 @@ package org.apache.tinkerpop.gremlin.process.computer.ranking;
 
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 import org.junit.Test;
 
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -36,30 +37,32 @@ public class PageRankVertexProgramTest extends AbstractGremlinProcessTest {
     @Test
     @LoadGraphWith(MODERN)
     public void shouldExecutePageRank() throws Exception {
-        final ComputerResult result = graph.compute(g.getGraphComputer().get().getClass()).program(PageRankVertexProgram.build().create(graph)).submit().get();
-        result.graph().traversal().V().forEachRemaining(v -> {
-            assertTrue(v.keys().contains("name"));
-            assertTrue(v.keys().contains(PageRankVertexProgram.PAGE_RANK));
-            final String name = v.value("name");
-            final Double pageRank = v.value(PageRankVertexProgram.PAGE_RANK);
-            //System.out.println(name + "-----" + pageRank);
-            if (name.equals("marko"))
-                assertTrue(pageRank > 0.14 && pageRank < 0.16);
-            else if (name.equals("vadas"))
-                assertTrue(pageRank > 0.19 && pageRank < 0.20);
-            else if (name.equals("lop"))
-                assertTrue(pageRank > 0.40 && pageRank < 0.41);
-            else if (name.equals("josh"))
-                assertTrue(pageRank > 0.19 && pageRank < 0.20);
-            else if (name.equals("ripple"))
-                assertTrue(pageRank > 0.23 && pageRank < 0.24);
-            else if (name.equals("peter"))
-                assertTrue(pageRank > 0.14 && pageRank < 0.16);
-            else
-                throw new IllegalStateException("The following vertex should not exist in the graph: " + name);
-        });
-        assertEquals(result.memory().getIteration(), 30);
-        assertEquals(result.memory().asMap().size(), 0);
+        if (g.getGraphComputer().get().features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.VERTEX_PROPERTIES)) {
+            final ComputerResult result = graph.compute(g.getGraphComputer().get().getClass()).program(PageRankVertexProgram.build().create(graph)).submit().get();
+            result.graph().traversal().V().forEachRemaining(v -> {
+                assertTrue(v.keys().contains("name"));
+                assertTrue(v.keys().contains(PageRankVertexProgram.PAGE_RANK));
+                final String name = v.value("name");
+                final Double pageRank = v.value(PageRankVertexProgram.PAGE_RANK);
+                //System.out.println(name + "-----" + pageRank);
+                if (name.equals("marko"))
+                    assertTrue(pageRank > 0.14 && pageRank < 0.16);
+                else if (name.equals("vadas"))
+                    assertTrue(pageRank > 0.19 && pageRank < 0.20);
+                else if (name.equals("lop"))
+                    assertTrue(pageRank > 0.40 && pageRank < 0.41);
+                else if (name.equals("josh"))
+                    assertTrue(pageRank > 0.19 && pageRank < 0.20);
+                else if (name.equals("ripple"))
+                    assertTrue(pageRank > 0.23 && pageRank < 0.24);
+                else if (name.equals("peter"))
+                    assertTrue(pageRank > 0.14 && pageRank < 0.16);
+                else
+                    throw new IllegalStateException("The following vertex should not exist in the graph: " + name);
+            });
+            assertEquals(result.memory().getIteration(), 30);
+            assertEquals(result.memory().asMap().size(), 0);
+        }
     }
 
     /*@Test

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/conf/hadoop-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-graphson.properties b/hadoop-gremlin/conf/hadoop-graphson.properties
index 146a015..2013b7b 100644
--- a/hadoop-gremlin/conf/hadoop-graphson.properties
+++ b/hadoop-gremlin/conf/hadoop-graphson.properties
@@ -24,8 +24,6 @@ gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOut
 # i/o locations
 gremlin.hadoop.inputLocation=tinkerpop-modern.json
 gremlin.hadoop.outputLocation=output
-# deriving a complete view of the memory requires an extra mapreduce job and thus, if not needed, should be avoided
-gremlin.hadoop.deriveMemory=false
 # if the job jars are not on the classpath of every hadoop node, then they must be provided to the distributed cache at runtime
 gremlin.hadoop.jarsInDistributedCache=true
 # the vertex program to execute

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index 13a1e41..2eba291 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -18,7 +18,6 @@ gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
 gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
 gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
 gremlin.hadoop.jarsInDistributedCache=true
 
 gremlin.hadoop.inputLocation=tinkerpop-modern.kryo

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/conf/hadoop-script.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-script.properties b/hadoop-gremlin/conf/hadoop-script.properties
index 0388801..0792023 100644
--- a/hadoop-gremlin/conf/hadoop-script.properties
+++ b/hadoop-gremlin/conf/hadoop-script.properties
@@ -18,7 +18,6 @@ gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
 gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
 gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
 gremlin.hadoop.jarsInDistributedCache=true
 
 gremlin.hadoop.inputLocation=tinkerpop-classic.txt

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index b2e1b0a..de744b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -36,7 +36,6 @@ public final class Constants {
     public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphOutputFormat.hasEdges";
     public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphInputFormat.hasEdges";
     public static final String GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT = "gremlin.hadoop.memoryOutputFormat";
-    // public static final String GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER = "gremlin.hadoop.defaultGraphComputer";
 
     public static final String GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE = "gremlin.hadoop.jarsInDistributedCache";
     public static final String HIDDEN_G = Graph.Hidden.hide("g");
@@ -46,14 +45,8 @@ public final class Constants {
     public static final String GREMLIN_HADOOP_SPARK_JOB_PREFIX = "HadoopGremlin(Spark): ";
     public static final String HADOOP_GREMLIN_LIBS = "HADOOP_GREMLIN_LIBS";
     public static final String DOT_JAR = ".jar";
-    public static final String GREMLIN_HADOOP_DERIVE_MEMORY = "gremlin.hadoop.deriveMemory";
-    public static final String HIDDEN_MEMORY = Graph.Hidden.hide("memory");
-    public static final String HIDDEN_RUNTIME = Graph.Hidden.hide("gremlin.hadoop.runtime");
     public static final String HIDDEN_ITERATION = Graph.Hidden.hide("gremlin.hadoop.iteration");
-    public static final String GREMLIN_HADOOP_MEMORY_KEYS = "gremlin.hadoop.memoryKeys";
     public static final String GREMLIN_HADOOP_MAP_REDUCE_CLASS = "gremlin.hadoop.mapReduceClass";
-    public static final String GREMLIN_HADOOP_HALT = "gremlin.hadoop.halt";
-    public static final String GREMLIN_HADOOP_MAP_MEMORY = "gremlin.hadoop.mapMemory";
 
     public static final String MAPRED_INPUT_DIR = "mapred.input.dir";
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index 1507def..2581388 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -20,14 +20,10 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -48,17 +44,7 @@ public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWrit
     public void compute(final Iterable<ObjectWritable> messages) {
         final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
         final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
-        final GiraphMemory memory = workerContext.getMemory();
-        final GiraphMessenger messenger = workerContext.getMessenger(this, messages.iterator());
-        ///////////
-        if (!(Boolean) ((Rule) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
-            vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), messenger, memory);
-        } else if (workerContext.deriveMemory()) {
-            final MapMemory mapMemory = new MapMemory();
-            memory.asMap().forEach(mapMemory::set);
-            mapMemory.setIteration(memory.getIteration() - 1);
-            this.getValue().get().property(VertexProperty.Cardinality.single, Constants.GREMLIN_HADOOP_MAP_MEMORY, mapMemory); // not a compute key cause no ComputerGraph is used
-        }
+        vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), workerContext.getMessenger(this, messages.iterator()), workerContext.getMemory());
         workerContext.getVertexProgramPool().offer(vertexProgram);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index e51b741..e8f026f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
@@ -39,9 +38,9 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphC
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVertexInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVertexOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MemoryMapReduce;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
@@ -54,8 +53,6 @@ import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 
 import java.io.File;
 import java.io.NotSerializableException;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
@@ -108,6 +105,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 //e.printStackTrace();
                 throw new IllegalStateException(e.getMessage(), e);
             }
+
             this.memory.setRuntime(System.currentTimeMillis() - startTime);
             return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph.get(), this.persist.get()), this.memory.asImmutable());
         });
@@ -126,14 +124,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                     if (e.getCause() instanceof NumberFormatException)
                         throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
                 }
-                // calculate main vertex program memory if desired (costs one mapreduce job)
-                if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false)) {
-                    final Set<String> memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
-                    memoryKeys.add(Constants.HIDDEN_ITERATION);
-                    this.giraphConfiguration.setStrings(Constants.GREMLIN_HADOOP_MEMORY_KEYS, (String[]) memoryKeys.toArray(new String[memoryKeys.size()]));
-                    this.mapReducers.add(new MemoryMapReduce(memoryKeys));
-                }
-
                 // prepare the giraph vertex-centric computing job
                 final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
                 // handle input paths (if any)
@@ -145,15 +135,24 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 }
                 // handle output paths
                 final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
-                if (this.persist.get().equals(Persist.NOTHING) && this.mapReducers.isEmpty()) // do not write the graph back if it is not needed in MapReduce
-                    job.getInternalJob().setOutputFormatClass(NullOutputFormat.class);
-                else
-                    FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+                FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
                 job.getInternalJob().setJarByClass(GiraphGraphComputer.class);
                 this.logger.info(Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
                 // execute the job and wait until it completes (if it fails, throw an exception)
                 if (!job.run(true))
                     throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs");  // how do I get the exception that occured?
+                // add vertex program memory values to the return memory
+                for (final String key : this.vertexProgram.getMemoryComputeKeys()) {
+                    final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + key);
+                    final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, path);
+                    if (iterator.hasNext()) {
+                        this.memory.set(key, iterator.next().getValue());
+                    }
+                    FileSystem.get(this.giraphConfiguration).delete(path, true);
+                }
+                final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_ITERATION);
+                this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
+                FileSystem.get(this.giraphConfiguration).delete(path, true);
             }
             // do map reduce jobs
             this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
@@ -161,6 +160,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 this.memory.addMapReduceMemoryKey(mapReduce);
                 MapReduceHelper.executeMapReduceJob(mapReduce, this.memory, this.giraphConfiguration);
             }
+
             // if no persistence, delete the map reduce output
             if (this.persist.get().equals(Persist.NOTHING)) {
                 final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
index 78e0e45..2025914 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
@@ -20,21 +20,29 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.giraph.master.MasterCompute;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -76,23 +84,39 @@ public final class GiraphMemory extends MasterCompute implements Memory {
                     MemoryHelper.validateKey(key);
                     this.registerPersistentAggregator(key, MemoryAggregator.class);
                 }
-                this.registerPersistentAggregator(Constants.GREMLIN_HADOOP_HALT, MemoryAggregator.class);
-                this.registerPersistentAggregator(Constants.HIDDEN_RUNTIME, MemoryAggregator.class);
-                this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.FALSE));
-                this.set(Constants.HIDDEN_RUNTIME, System.currentTimeMillis());
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
             this.vertexProgram.setup(this);
         } else {
-            if (this.get(Constants.GREMLIN_HADOOP_HALT)) {
+            if (this.vertexProgram.terminate(this)) { // terminate
+                // write the memory to HDFS
+                final MapMemory memory = new MapMemory(this);
+                // a hack to get the last iteration memory values to stick
+                this.vertexProgram.terminate(memory);
+                final String outputLocation = this.getConf().get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+                if (null != outputLocation) {
+                    try {
+                        final Class<? extends OutputFormat> memoryOutputFormat = this.getConf().getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class);
+                        if (!memoryOutputFormat.equals(SequenceFileOutputFormat.class))
+                            HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+                        else {
+                            for (final String key : this.keys()) {
+                                final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
+                                writer.append(new ObjectWritable<>(MapReduce.NullObject.instance()), new ObjectWritable<>(memory.get(key)));
+                                writer.close();
+                            }
+                            final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + Constants.HIDDEN_ITERATION), ObjectWritable.class, ObjectWritable.class);
+                            writer.append(new ObjectWritable<>(MapReduce.NullObject.instance()), new ObjectWritable<>(memory.getIteration()));
+                            writer.close();
+                        }
+                    } catch (final Exception e) {
+                        throw new IllegalStateException(e.getMessage(), e);
+                    }
+                }
                 this.haltComputation();
-            } else if (this.vertexProgram.terminate(this)) { // terminate
-                if (!this.getConf().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false)) // no need for the extra BSP round if memory is not required
-                    this.haltComputation();
-                else
-                    this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.TRUE));
             }
+
         }
     }
 
@@ -108,12 +132,18 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public long getRuntime() {
-        return System.currentTimeMillis() - this.<Long>get(Constants.HIDDEN_RUNTIME);
+        return System.currentTimeMillis(); // TODO: this should be stored in the configuration
     }
 
     @Override
     public Set<String> keys() {
-        return this.memoryKeys;
+        final Set<String> keys = new HashSet<>();
+        for(final String key : this.memoryKeys) {
+            if(this.exists(key))
+                keys.add(key);
+        }
+        return keys;
+        // return this.memoryKeys.stream().filter(this::exists).collect(Collectors.toSet());
     }
 
     @Override
@@ -195,7 +225,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!key.equals(Constants.HIDDEN_RUNTIME) && !this.memoryKeys.contains(key))
+        if (!this.memoryKeys.contains(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
index cd42fa0..85d2c2d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 import org.apache.commons.configuration.Configuration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.worker.WorkerContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
@@ -39,7 +38,6 @@ public final class GiraphWorkerContext extends WorkerContext {
 
     private VertexProgramPool vertexProgramPool;
     private GiraphMemory memory;
-    private boolean deriveMemory;
 
     public GiraphWorkerContext() {
         // Giraph ReflectionUtils requires this to be public at minimum
@@ -51,7 +49,6 @@ public final class GiraphWorkerContext extends WorkerContext {
         final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
         this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
         this.memory = new GiraphMemory(this, vertexProgram);
-        this.deriveMemory = this.getContext().getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false);
     }
 
     public void postApplication() {
@@ -77,8 +74,4 @@ public final class GiraphWorkerContext extends WorkerContext {
     public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable> messages) {
         return new GiraphMessenger(giraphComputeVertex, messages);
     }
-
-    public boolean deriveMemory() {
-        return this.deriveMemory;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MemoryMapReduce.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MemoryMapReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MemoryMapReduce.java
deleted file mode 100644
index 3afdc19..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MemoryMapReduce.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MemoryMapReduce extends StaticMapReduce<MapReduce.NullObject, MapMemory, MapReduce.NullObject, MapMemory, MapMemory> {
-
-    public Set<String> memoryKeys = new HashSet<>();
-
-    @Override
-    public String getMemoryKey() {
-        return Constants.HIDDEN_MEMORY;
-    }
-
-    private MemoryMapReduce() {
-
-    }
-
-    public MemoryMapReduce(final Set<String> memoryKeys) {
-        this.memoryKeys = memoryKeys;
-    }
-
-    @Override
-    public void storeState(final Configuration configuration) {
-        super.storeState(configuration);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_MEMORY_KEYS, new ArrayList<>(this.memoryKeys.size()));
-    }
-
-    @Override
-    public void loadState(final Graph graph, final Configuration configuration) {
-        this.memoryKeys = new HashSet((List) configuration.getList(Constants.GREMLIN_HADOOP_MEMORY_KEYS));
-    }
-
-    @Override
-    public boolean doStage(final Stage stage) {
-        return true;
-    }
-
-    @Override
-    public void map(final Vertex vertex, final MapEmitter<NullObject, MapMemory> emitter) {
-        final MapMemory mapMemory = vertex.<MapMemory>property(Constants.GREMLIN_HADOOP_MAP_MEMORY).orElse(new MapMemory());
-        emitter.emit(mapMemory);
-    }
-
-    @Override
-    public void combine(final NullObject key, final Iterator<MapMemory> values, final ReduceEmitter<NullObject, MapMemory> emitter) {
-        this.reduce(key, values, emitter);
-    }
-
-    @Override
-    public void reduce(final NullObject key, final Iterator<MapMemory> values, final ReduceEmitter<NullObject, MapMemory> emitter) {
-        emitter.emit(key, values.next());
-    }
-
-    @Override
-    public MapMemory generateFinalResult(final Iterator<KeyValue<NullObject, MapMemory>> keyValues) {
-        return keyValues.next().getValue();
-    }
-
-    @Override
-    public void addResultToMemory(final Memory.Admin memory, final Iterator<KeyValue<NullObject, MapMemory>> keyValues) {
-        final MapMemory temp = keyValues.next().getValue();
-        temp.asMap().forEach(memory::set);
-        memory.setIteration(temp.getIteration());
-        memory.setRuntime(temp.getRuntime());
-    }
-
-    @Override
-    public int hashCode() {
-        return (this.getClass().getCanonicalName() + Constants.HIDDEN_MEMORY).hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object object) {
-        return GraphComputerHelper.areEqual(this, object);
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.mapReduceString(this, this.memoryKeys.toString());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index b593c8a..0fab71d 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -111,7 +111,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
             put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-            put(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
+           // put(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
             put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
             /// giraph configuration
             put(GiraphConstants.MIN_WORKERS, 1);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec9365a2/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
index d245a95..09df899 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
@@ -50,7 +50,7 @@ public class InputRDDTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
+      //  configuration.setProperty(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         ////////
         Graph graph = GraphFactory.open(configuration);