You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/09 22:20:39 UTC

[09/18] incubator-tinkerpop git commit: pulled out giraph-gremlin from hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still causing problems.

pulled out giraph-gremlin from hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still causing problems.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac56b309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac56b309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac56b309

Branch: refs/heads/master
Commit: ac56b3095c8d9ddb685fd1286e7d870db1739618
Parents: b2132b1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 8 10:49:19 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 8 10:49:19 2015 -0600

----------------------------------------------------------------------
 giraph-gremlin/pom.xml                          | 235 +++++++++++++++++++
 .../giraph/process/computer/EmptyOutEdges.java  |  80 +++++++
 .../process/computer/GiraphComputeVertex.java   |  50 ++++
 .../process/computer/GiraphGraphComputer.java   | 222 ++++++++++++++++++
 .../giraph/process/computer/GiraphMemory.java   | 218 +++++++++++++++++
 .../process/computer/GiraphMessageCombiner.java |  62 +++++
 .../process/computer/GiraphMessenger.java       |  79 +++++++
 .../process/computer/GiraphWorkerContext.java   |  77 ++++++
 .../process/computer/MemoryAggregator.java      |  94 ++++++++
 .../computer/io/GiraphVertexInputFormat.java    |  65 +++++
 .../computer/io/GiraphVertexOutputFormat.java   |  65 +++++
 .../process/computer/io/GiraphVertexReader.java |  67 ++++++
 .../process/computer/io/GiraphVertexWriter.java |  57 +++++
 .../giraph/process/HadoopGraphProvider.java     | 161 +++++++++++++
 ...GiraphGraphComputerProcessIntegrateTest.java |  32 +++
 .../computer/HadoopGiraphGraphProvider.java     |  37 +++
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 +++
 .../GiraphHadoopGremlinPluginIntegrateTest.java |  32 +++
 hadoop-gremlin/pom.xml                          |  50 ----
 .../groovy/plugin/HadoopGremlinPlugin.java      |   3 -
 .../process/computer/giraph/EmptyOutEdges.java  |  81 -------
 .../computer/giraph/GiraphComputeVertex.java    |  50 ----
 .../computer/giraph/GiraphGraphComputer.java    | 222 ------------------
 .../process/computer/giraph/GiraphMemory.java   | 218 -----------------
 .../computer/giraph/GiraphMessageCombiner.java  |  62 -----
 .../computer/giraph/GiraphMessenger.java        |  79 -------
 .../computer/giraph/GiraphWorkerContext.java    |  77 ------
 .../computer/giraph/MemoryAggregator.java       |  94 --------
 .../giraph/io/GiraphVertexInputFormat.java      |  65 -----
 .../giraph/io/GiraphVertexOutputFormat.java     |  65 -----
 .../computer/giraph/io/GiraphVertexReader.java  |  67 ------
 .../computer/giraph/io/GiraphVertexWriter.java  |  57 -----
 .../gremlin/hadoop/structure/HadoopGraph.java   |  48 ++--
 .../gremlin/hadoop/HadoopGraphProvider.java     |  22 +-
 ...GiraphGraphComputerProcessIntegrateTest.java |  32 ---
 .../giraph/HadoopGiraphGraphProvider.java       |  36 ---
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 ---
 .../GiraphHadoopGremlinPluginIntegrateTest.java |  33 ---
 pom.xml                                         |   1 +
 spark-gremlin/pom.xml                           |   6 -
 .../spark/process/HadoopGraphProvider.java      |   1 -
 41 files changed, 1701 insertions(+), 1367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml
new file mode 100644
index 0000000..214e9c4
--- /dev/null
+++ b/giraph-gremlin/pom.xml
@@ -0,0 +1,235 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.tinkerpop</groupId>
+        <artifactId>tinkerpop</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>giraph-gremlin</artifactId>
+    <name>Apache TinkerPop :: Giraph Gremlin</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>hadoop-gremlin</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.giraph</groupId>
+                    <artifactId>giraph-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.java.dev.jets3t</groupId>
+                    <artifactId>jets3t</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- GIRAPH GRAPH COMPUTER -->
+        <dependency>
+            <groupId>org.apache.giraph</groupId>
+            <artifactId>giraph-core</artifactId>
+            <version>1.0.0</version>
+            <exclusions>
+                <!-- self conflicts -->
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <!-- conflicts with gremlin-core -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- gremlin-groovy conflicts -->
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+                <!-- gremlin-test conflicts -->
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- consistent dependencies -->
+        <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+            <version>3.1</version>
+        </dependency>
+        <!-- TEST -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>tinkergraph-gremlin</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>hyracks-releases</id>
+            <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
+        </repository>
+    </repositories>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>${basedir}/src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>build-detached-assemblies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <attach>false</attach>
+                            <descriptors>
+                                <descriptor>src/assembly/standalone.xml</descriptor>
+                                <descriptor>src/assembly/hadoop-job.xml</descriptor>
+                            </descriptors>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.gmavenplus</groupId>
+                <artifactId>gmavenplus-plugin</artifactId>
+                <version>1.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>addSources</goal>
+                            <goal>addTestSources</goal>
+                            <goal>generateStubs</goal>
+                            <goal>compile</goal>
+                            <goal>testGenerateStubs</goal>
+                            <goal>testCompile</goal>
+                            <goal>removeStubs</goal>
+                            <goal>removeTestStubs</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <invokeDynamic>true</invokeDynamic>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Gremlin-Plugin-Dependencies>org.apache.hadoop:hadoop-core:1.2.1
+                            </Gremlin-Plugin-Dependencies>
+                            <!-- deletes the servlet-api jar from the path after install - causes conflicts -->
+                            <Gremlin-Plugin-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Plugin-Paths>
+                            <Gremlin-Lib-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Lib-Paths>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <!-- log4j.configuration>log4j-core.properties</log4j.configuration> -->
+                    <!--<argLine>-Xmx2048M</argLine>-->
+                    <excludes>
+                        <exclude>**/*IntegrateTest.java</exclude>
+                        <exclude>**/*PerformanceTest.java</exclude>
+                        <!-- this is technically a member of the integration test suite -->
+                        <exclude>**/HadoopGremlinPluginTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
new file mode 100644
index 0000000..cb649f6
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class EmptyOutEdges implements OutEdges<ObjectWritable, NullWritable> {
+
+    private static final EmptyOutEdges INSTANCE = new EmptyOutEdges();
+
+    public static EmptyOutEdges instance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public void initialize(final Iterable<Edge<ObjectWritable, NullWritable>> edges) {
+    }
+
+    @Override
+    public void initialize(final int capacity) {
+    }
+
+    @Override
+    public void initialize() {
+    }
+
+    @Override
+    public void add(final Edge<ObjectWritable, NullWritable> edge) {
+    }
+
+    @Override
+    public void remove(final ObjectWritable targetVertexId) {
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    @Override
+    public Iterator<Edge<ObjectWritable, NullWritable>> iterator() {
+        return Collections.emptyIterator();
+    }
+
+    @Override
+    public void write(final DataOutput dataOutput) throws IOException {
+    }
+
+    @Override
+    public void readFields(final DataInput dataInput) throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
new file mode 100644
index 0000000..7f39c46
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWritable, NullWritable, ObjectWritable> {
+
+    public GiraphComputeVertex() {
+    }
+
+    public GiraphComputeVertex(final VertexWritable vertexWritable) {
+        final VertexWritable newWritable = new VertexWritable();
+        newWritable.set(vertexWritable.get());
+        this.initialize(new ObjectWritable<>(newWritable.get().id()), newWritable, EmptyOutEdges.instance());
+
+    }
+
+    @Override
+    public void compute(final Iterable<ObjectWritable> messages) {
+        final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
+        final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
+        vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), workerContext.getMessenger(this, messages.iterator()), workerContext.getMemory());
+        workerContext.getVertexProgramPool().offer(vertexProgram);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
new file mode 100644
index 0000000..acd5628
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tinkerpop.giraph.process.computer.io.GiraphVertexInputFormat;
+import org.apache.tinkerpop.giraph.process.computer.io.GiraphVertexOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+
+import java.io.File;
+import java.io.NotSerializableException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphGraphComputer extends AbstractHadoopGraphComputer implements GraphComputer, Tool {
+
+    protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    private MapMemory memory = new MapMemory();
+
+    public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
+        super(hadoopGraph);
+        final Configuration configuration = hadoopGraph.configuration();
+        configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
+        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+        this.giraphConfiguration.setVertexClass(GiraphComputeVertex.class);
+        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
+        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
+    }
+
+    @Override
+    public GraphComputer program(final VertexProgram vertexProgram) {
+        super.program(vertexProgram);
+        this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
+        final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+        vertexProgram.storeState(apacheConfiguration);
+        ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
+        this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setCombinerClass(GiraphMessageCombiner.class));
+        return this;
+    }
+
+    @Override
+    public Future<ComputerResult> submit() {
+        final long startTime = System.currentTimeMillis();
+        super.validateStatePriorToExecution();
+        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+            try {
+                final FileSystem fs = FileSystem.get(this.giraphConfiguration);
+                this.loadJars(fs);
+                fs.delete(new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+                ToolRunner.run(this, new String[]{});
+            } catch (final Exception e) {
+                //e.printStackTrace();
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+
+            this.memory.setRuntime(System.currentTimeMillis() - startTime);
+            return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
+        });
+    }
+
+    @Override
+    public int run(final String[] args) {
+        this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
+        try {
+            // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
+            if (null != this.vertexProgram) {
+                // a way to verify in Giraph whether the traversal will go over the wire or not
+                try {
+                    VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
+                } catch (IllegalStateException e) {
+                    if (e.getCause() instanceof NumberFormatException)
+                        throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
+                }
+                // prepare the giraph vertex-centric computing job
+                final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
+                // handle input paths (if any)
+                if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+                    final Path inputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+                    if (!FileSystem.get(this.giraphConfiguration).exists(inputPath))  // TODO: what about when the input is not a file input?
+                        throw new IllegalArgumentException("The provided input path does not exist: " + inputPath);
+                    FileInputFormat.setInputPaths(job.getInternalJob(), inputPath);
+                }
+                // handle output paths
+                final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
+                FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+                job.getInternalJob().setJarByClass(GiraphGraphComputer.class);
+                this.logger.info(Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
+                // execute the job and wait until it completes (if it fails, throw an exception)
+                if (!job.run(true))
+                    throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs");  // how do I get the exception that occured?
+                // add vertex program memory values to the return memory
+                for (final String key : this.vertexProgram.getMemoryComputeKeys()) {
+                    final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + key);
+                    final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, path);
+                    if (iterator.hasNext()) {
+                        this.memory.set(key, iterator.next().getValue());
+                    }
+                    FileSystem.get(this.giraphConfiguration).delete(path, true);
+                }
+                final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_ITERATION);
+                this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
+                FileSystem.get(this.giraphConfiguration).delete(path, true);
+            }
+            // do map reduce jobs
+            this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
+            for (final MapReduce mapReduce : this.mapReducers) {
+                this.memory.addMapReduceMemoryKey(mapReduce);
+                MapReduceHelper.executeMapReduceJob(mapReduce, this.memory, this.giraphConfiguration);
+            }
+
+            // if no persistence, delete the map reduce output
+            if (this.persist.equals(Persist.NOTHING)) {
+                final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
+                if (FileSystem.get(this.giraphConfiguration).exists(outputPath))      // TODO: what about when the output is not a file output?
+                    FileSystem.get(this.giraphConfiguration).delete(outputPath, true);
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return 0;
+    }
+
+    @Override
+    public void setConf(final org.apache.hadoop.conf.Configuration configuration) {
+        // TODO: is this necessary to implement?
+    }
+
+    @Override
+    public org.apache.hadoop.conf.Configuration getConf() {
+        return this.giraphConfiguration;
+    }
+
+    private void loadJars(final FileSystem fs) {
+        final String hadoopGremlinLibsRemote = "hadoop-gremlin-libs";
+        if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLocalLibs)
+                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+            else {
+                final String[] paths = hadoopGremlinLocalLibs.split(":");
+                for (final String path : paths) {
+                    final File file = new File(path);
+                    if (file.exists()) {
+                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> {
+                            try {
+                                final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName());
+                                fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
+                                try {
+                                    DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs);
+                                } catch (final Exception e) {
+                                    throw new RuntimeException(e.getMessage(), e);
+                                }
+                            } catch (Exception e) {
+                                throw new IllegalStateException(e.getMessage(), e);
+                            }
+                        });
+                    } else {
+                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+                    }
+                }
+            }
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
new file mode 100644
index 0000000..5a56bd3
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMemory extends MasterCompute implements Memory {
+
+    private VertexProgram<?> vertexProgram;
+    private GiraphWorkerContext worker;
+    private Set<String> memoryKeys;
+    private boolean isMasterCompute = true;
+    private long startTime = System.currentTimeMillis();
+
+    public GiraphMemory() {
+        // Giraph ReflectionUtils requires this to be public at minimum
+    }
+
+    public GiraphMemory(final GiraphWorkerContext worker, final VertexProgram<?> vertexProgram) {
+        this.worker = worker;
+        this.vertexProgram = vertexProgram;
+        this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+        this.isMasterCompute = false;
+    }
+
+
+    @Override
+    public void initialize() {
+        // do not initialize aggregators here because the getConf() configuration is not available at this point
+        // use compute() initial iteration instead
+    }
+
+    @Override
+    public void compute() {
+        this.isMasterCompute = true;
+        if (0 == this.getSuperstep()) { // setup
+            final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getConf());
+            this.vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+            this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+            try {
+                for (final String key : this.memoryKeys) {
+                    MemoryHelper.validateKey(key);
+                    this.registerPersistentAggregator(key, MemoryAggregator.class);
+                }
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+            this.vertexProgram.setup(this);
+        } else {
+            if (this.vertexProgram.terminate(this)) { // terminate
+                // write the memory to HDFS
+                final MapMemory memory = new MapMemory(this);
+                // a hack to get the last iteration memory values to stick
+                this.vertexProgram.terminate(memory);
+                final String outputLocation = this.getConf().get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+                if (null != outputLocation) {
+                    try {
+                        for (final String key : this.keys()) {
+                            final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
+                            writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
+                            writer.close();
+                        }
+                        final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + Constants.HIDDEN_ITERATION), ObjectWritable.class, ObjectWritable.class);
+                        writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.getIteration()));
+                        writer.close();
+                    } catch (final Exception e) {
+                        throw new IllegalStateException(e.getMessage(), e);
+                    }
+                }
+                this.haltComputation();
+            }
+        }
+    }
+
+    @Override
+    public int getIteration() {
+        if (this.isMasterCompute) {
+            final int temp = (int) this.getSuperstep();
+            return temp == 0 ? temp : temp - 1;
+        } else {
+            return (int) this.worker.getSuperstep();
+        }
+    }
+
+    @Override
+    public long getRuntime() {
+        return System.currentTimeMillis() - this.startTime;
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.memoryKeys.stream().filter(this::exists).collect(Collectors.toSet());
+    }
+
+    @Override
+    public boolean exists(final String key) {
+        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        return null != rule.getObject();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        //this.checkKey(key);
+        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        if (null == rule.getObject())
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        else
+            return rule.getObject();
+    }
+
+    @Override
+    public void set(final String key, Object value) {
+        this.checkKeyValue(key, value);
+        if (this.isMasterCompute)
+            this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
+        else
+            this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
+    }
+
+    @Override
+    public void and(final String key, final boolean bool) {
+        this.checkKeyValue(key, bool);
+        if (this.isMasterCompute) {  // only called on setup() and terminate()
+            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
+            value = null == value ? bool : bool && value;
+            this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
+        } else {
+            this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
+        }
+    }
+
+    @Override
+    public void or(final String key, final boolean bool) {
+        this.checkKeyValue(key, bool);
+        if (this.isMasterCompute) {   // only called on setup() and terminate()
+            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
+            value = null == value ? bool : bool || value;
+            this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
+        } else {
+            this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
+        }
+    }
+
+    @Override
+    public void incr(final String key, final long delta) {
+        this.checkKeyValue(key, delta);
+        if (this.isMasterCompute) {   // only called on setup() and terminate()
+            Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
+            value = null == value ? delta : value.longValue() + delta;
+            this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
+        } else {
+            this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
+        }
+    }
+
+    @Override
+    public void write(final DataOutput output) {
+        // no need to serialize the master compute as it gets its data from aggregators
+        // is this true?
+    }
+
+    @Override
+    public void readFields(final DataInput input) {
+        // no need to serialize the master compute as it gets its data from aggregators
+        // is this true?
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.memoryString(this);
+    }
+
+    private void checkKeyValue(final String key, final Object value) {
+        if (!this.memoryKeys.contains(key))
+            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        MemoryHelper.validateValue(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
new file mode 100644
index 0000000..8da835d
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMessageCombiner extends Combiner<ObjectWritable, ObjectWritable> implements ImmutableClassesGiraphConfigurable {
+
+    private MessageCombiner messageCombiner;
+    private ImmutableClassesGiraphConfiguration configuration;
+
+    @Override
+    public void combine(final ObjectWritable vertexIndex, final ObjectWritable originalMessage, final ObjectWritable messageToCombine) {
+        originalMessage.set(originalMessage.isEmpty() ?
+                messageToCombine.get() :
+                this.messageCombiner.combine(originalMessage.get(), messageToCombine.get()));
+    }
+
+    @Override
+    public ObjectWritable createInitialMessage() {
+        return ObjectWritable.empty();
+    }
+
+    @Override
+    public void setConf(final ImmutableClassesGiraphConfiguration configuration) {
+        this.configuration = configuration;
+        final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(configuration);
+        this.messageCombiner = (MessageCombiner) VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().get();
+    }
+
+    @Override
+    public ImmutableClassesGiraphConfiguration getConf() {
+        return this.configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
new file mode 100644
index 0000000..ca3e100
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMessenger<M> implements Messenger<M> {
+
+    private GiraphComputeVertex giraphComputeVertex;
+    private Iterator<ObjectWritable<M>> messages;
+
+    public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable<M>> messages) {
+        this.giraphComputeVertex = giraphComputeVertex;
+        this.messages = messages;
+    }
+
+    @Override
+    public Iterator<M> receiveMessages() {
+        return IteratorUtils.map(this.messages, ObjectWritable::get);
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        if (messageScope instanceof MessageScope.Local) {
+            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
+            final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
+            incidentTraversal.forEachRemaining(edge ->
+                    this.giraphComputeVertex.sendMessage(
+                            new ObjectWritable<>(edge.vertices(direction).next().id()),
+                            new ObjectWritable<>(localMessageScope.getEdgeFunction().apply(message, edge))));
+        } else {
+            final MessageScope.Global globalMessageScope = (MessageScope.Global) messageScope;
+            globalMessageScope.vertices().forEach(vertex ->
+                    this.giraphComputeVertex.sendMessage(new ObjectWritable<>(vertex.id()), new ObjectWritable<>(message)));
+        }
+    }
+
+    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+        return (T) incidentTraversal;
+    }
+
+    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+        return step.getDirection().opposite();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
new file mode 100644
index 0000000..6ff7dc0
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphWorkerContext extends WorkerContext {
+
+    private VertexProgramPool vertexProgramPool;
+    private GiraphMemory memory;
+
+    public GiraphWorkerContext() {
+        // Giraph ReflectionUtils requires this to be public at minimum
+    }
+
+    public void preApplication() throws InstantiationException, IllegalAccessException {
+        final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
+        HadoopPools.initialize(apacheConfiguration);
+        final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+        this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
+        this.memory = new GiraphMemory(this, vertexProgram);
+    }
+
+    public void postApplication() {
+
+    }
+
+    public void preSuperstep() {
+        this.vertexProgramPool.workerIterationStart(new ImmutableMemory(this.memory));
+    }
+
+    public void postSuperstep() {
+        this.vertexProgramPool.workerIterationEnd(new ImmutableMemory(this.memory));
+    }
+
+    public VertexProgramPool getVertexProgramPool() {
+        return this.vertexProgramPool;
+    }
+
+    public GiraphMemory getMemory() {
+        return this.memory;
+    }
+
+    public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable> messages) {
+        return new GiraphMessenger(giraphComputeVertex, messages);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
new file mode 100644
index 0000000..6526fab
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MemoryAggregator implements Aggregator<Rule> {
+
+    private Object currentObject;
+    private Rule.Operation lastOperation = null;
+
+    public MemoryAggregator() {
+        this.currentObject = null;
+    }
+
+    @Override
+    public Rule getAggregatedValue() {
+        if (null == this.currentObject)
+            return createInitialValue();
+        else if (this.currentObject instanceof Long)
+            return new Rule(Rule.Operation.INCR, this.currentObject);
+        else
+            return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
+    }
+
+    @Override
+    public void setAggregatedValue(final Rule rule) {
+        this.currentObject = rule.getObject();
+    }
+
+    @Override
+    public void reset() {
+        this.currentObject = null;
+    }
+
+    @Override
+    public Rule createInitialValue() {
+        return new Rule(Rule.Operation.NO_OP, null);
+    }
+
+    @Override
+    public void aggregate(final Rule ruleWritable) {
+        final Rule.Operation rule = ruleWritable.getOperation();
+        final Object object = ruleWritable.getObject();
+        if (rule != Rule.Operation.NO_OP)
+            this.lastOperation = rule;
+
+        if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
+            this.currentObject = object;
+        } else {
+            if (rule.equals(Rule.Operation.INCR)) {
+                this.currentObject = (Long) this.currentObject + (Long) object;
+            } else if (rule.equals(Rule.Operation.AND)) {
+                this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+            } else if (rule.equals(Rule.Operation.OR)) {
+                this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+            } else if (rule.equals(Rule.Operation.NO_OP)) {
+                if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
+                    if (null == this.lastOperation) {
+                        // do nothing ... why?
+                    } else if (this.lastOperation.equals(Rule.Operation.AND)) {
+                        this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+                    } else if (this.lastOperation.equals(Rule.Operation.OR)) {
+                        this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+                    } else {
+                        throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
+                    }
+                }
+            } else {
+                throw new IllegalArgumentException("The provided rule is unknown: " + ruleWritable);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
new file mode 100644
index 0000000..2b3b723
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexInputFormat extends VertexInputFormat {
+
+    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+
+    @Override
+    public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphInputFormat.getSplits(context);
+    }
+
+    @Override
+    public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
+        this.constructor(context.getConfiguration());
+        try {
+            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphInputFormat) {
+            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
new file mode 100644
index 0000000..0135bd5
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexOutputFormat extends VertexOutputFormat {
+
+    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
+
+    @Override
+    public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+    }
+
+    @Override
+    public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphOutputFormat) {
+            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
new file mode 100644
index 0000000..7c8273c
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexReader extends VertexReader {
+
+    private RecordReader<NullWritable, VertexWritable> recordReader;
+
+    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+        this.recordReader = recordReader;
+    }
+
+    @Override
+    public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordReader.initialize(inputSplit, context);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return this.recordReader.nextKeyValue();
+    }
+
+    @Override
+    public Vertex getCurrentVertex() throws IOException, InterruptedException {
+        return new GiraphComputeVertex(this.recordReader.getCurrentValue());
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.recordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return this.recordReader.getProgress();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
new file mode 100644
index 0000000..812386d
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexWriter extends VertexWriter {
+    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
+    private RecordWriter<NullWritable, VertexWritable> recordWriter;
+
+    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
+        this.outputFormat = outputFormat;
+    }
+
+    @Override
+    public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter = this.outputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter.close(context);
+    }
+
+    @Override
+    public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+        this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
new file mode 100644
index 0000000..ae0ae10
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.giraph.process;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.script.ScriptResourceAccess;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HadoopGraphProvider extends AbstractGraphProvider {
+
+    private static final Random RANDOM = new Random();
+    private boolean graphSONInput = false;
+
+    public static Map<String, String> PATHS = new HashMap<>();
+    private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+        add(HadoopEdge.class);
+        add(HadoopElement.class);
+        add(HadoopGraph.class);
+        add(HadoopProperty.class);
+        add(HadoopVertex.class);
+        add(HadoopVertexProperty.class);
+    }};
+
+    static {
+        try {
+            final List<String> kryoResources = Arrays.asList(
+                    "tinkerpop-modern.kryo",
+                    "grateful-dead.kryo",
+                    "tinkerpop-classic.kryo",
+                    "tinkerpop-crew.kryo");
+            for (final String fileName : kryoResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(GryoResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+
+            final List<String> graphsonResources = Arrays.asList(
+                    "tinkerpop-modern.json",
+                    "grateful-dead.json",
+                    "tinkerpop-classic.json",
+                    "tinkerpop-crew.json");
+            for (final String fileName : graphsonResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(GraphSONResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+
+            final List<String> scriptResources = Arrays.asList(
+                    "tinkerpop-classic.txt",
+                    "script-input.groovy",
+                    "script-output.groovy",
+                    "grateful-dead.txt",
+                    "script-input-grateful-dead.groovy",
+                    "script-output-grateful-dead.groovy");
+            for (final String fileName : scriptResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(ScriptResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
+        this.graphSONInput = RANDOM.nextBoolean();
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, HadoopGraph.class.getName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            /// giraph configuration
+            put(GiraphConstants.MIN_WORKERS, 1);
+            put(GiraphConstants.MAX_WORKERS, 1);
+            put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
+            put(GiraphConstants.ZOOKEEPER_SERVER_PORT.getKey(), 2181);  // you must have a local zookeeper running on this port
+            put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+            put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+            put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
+            put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
+            put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
+            put("mapred.reduce.tasks", 4);
+            put("giraph.vertexOutputFormatThreadSafe", false);
+            put("giraph.numOutputThreads", 3);
+        }};
+    }
+
+    @Override
+    public void clear(final Graph graph, final Configuration configuration) throws Exception {
+        if (graph != null)
+            graph.close();
+    }
+
+    @Override
+    public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) {
+        if (loadGraphWith != null) this.loadGraphDataViaHadoopConfig(graph, loadGraphWith.value());
+    }
+
+    @Override
+    public Set<Class> getImplementations() {
+        return IMPLEMENTATION;
+    }
+
+    public void loadGraphDataViaHadoopConfig(final Graph g, final LoadGraphWith.GraphData graphData) {
+        final String type = this.graphSONInput ? "json" : "kryo";
+
+        if (graphData.equals(LoadGraphWith.GraphData.GRATEFUL)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("grateful-dead." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.MODERN)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-modern." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.CLASSIC)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-classic." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.CREW)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-crew." + type));
+        } else {
+            throw new RuntimeException("Could not load graph with " + graphData);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
new file mode 100644
index 0000000..857f3b8
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerProcessIntegrateTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
new file mode 100644
index 0000000..8b124e5
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer;
+
+import org.apache.tinkerpop.giraph.process.computer.GiraphGraphComputer;
+import org.apache.tinkerpop.gremlin.GraphProvider;
+import org.apache.tinkerpop.gremlin.giraph.process.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@GraphProvider.Descriptor(computer = GiraphGraphComputer.class)
+public final class HadoopGiraphGraphProvider extends HadoopGraphProvider {
+
+    public GraphTraversalSource traversal(final Graph graph) {
+        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(GiraphGraphComputer.class)).create(graph);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
new file mode 100644
index 0000000..9b01185
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer.groovy;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.HadoopGiraphGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(GroovyProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerGroovyProcessIntegrateTest {
+}