You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/08 18:48:55 UTC
[1/3] incubator-tinkerpop git commit: pulled out giraph-gremlin from
hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still
causing problems.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/hadoop_split b2132b1fa -> ac56b3095
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index 645c97f..bf33ae5 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -132,12 +132,6 @@
<scope>test</scope>
</dependency>
</dependencies>
- <repositories>
- <repository>
- <id>hyracks-releases</id>
- <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
- </repository>
- </repositories>
<build>
<directory>${basedir}/target</directory>
<finalName>${project.artifactId}-${project.version}</finalName>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
index 333636e..ebda863 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
@@ -45,7 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
[3/3] incubator-tinkerpop git commit: pulled out giraph-gremlin from
hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still
causing problems.
Posted by ok...@apache.org.
pulled out giraph-gremlin from hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still causing problems.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac56b309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac56b309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac56b309
Branch: refs/heads/hadoop_split
Commit: ac56b3095c8d9ddb685fd1286e7d870db1739618
Parents: b2132b1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 8 10:49:19 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 8 10:49:19 2015 -0600
----------------------------------------------------------------------
giraph-gremlin/pom.xml | 235 +++++++++++++++++++
.../giraph/process/computer/EmptyOutEdges.java | 80 +++++++
.../process/computer/GiraphComputeVertex.java | 50 ++++
.../process/computer/GiraphGraphComputer.java | 222 ++++++++++++++++++
.../giraph/process/computer/GiraphMemory.java | 218 +++++++++++++++++
.../process/computer/GiraphMessageCombiner.java | 62 +++++
.../process/computer/GiraphMessenger.java | 79 +++++++
.../process/computer/GiraphWorkerContext.java | 77 ++++++
.../process/computer/MemoryAggregator.java | 94 ++++++++
.../computer/io/GiraphVertexInputFormat.java | 65 +++++
.../computer/io/GiraphVertexOutputFormat.java | 65 +++++
.../process/computer/io/GiraphVertexReader.java | 67 ++++++
.../process/computer/io/GiraphVertexWriter.java | 57 +++++
.../giraph/process/HadoopGraphProvider.java | 161 +++++++++++++
...GiraphGraphComputerProcessIntegrateTest.java | 32 +++
.../computer/HadoopGiraphGraphProvider.java | 37 +++
...GraphComputerGroovyProcessIntegrateTest.java | 33 +++
.../GiraphHadoopGremlinPluginIntegrateTest.java | 32 +++
hadoop-gremlin/pom.xml | 50 ----
.../groovy/plugin/HadoopGremlinPlugin.java | 3 -
.../process/computer/giraph/EmptyOutEdges.java | 81 -------
.../computer/giraph/GiraphComputeVertex.java | 50 ----
.../computer/giraph/GiraphGraphComputer.java | 222 ------------------
.../process/computer/giraph/GiraphMemory.java | 218 -----------------
.../computer/giraph/GiraphMessageCombiner.java | 62 -----
.../computer/giraph/GiraphMessenger.java | 79 -------
.../computer/giraph/GiraphWorkerContext.java | 77 ------
.../computer/giraph/MemoryAggregator.java | 94 --------
.../giraph/io/GiraphVertexInputFormat.java | 65 -----
.../giraph/io/GiraphVertexOutputFormat.java | 65 -----
.../computer/giraph/io/GiraphVertexReader.java | 67 ------
.../computer/giraph/io/GiraphVertexWriter.java | 57 -----
.../gremlin/hadoop/structure/HadoopGraph.java | 48 ++--
.../gremlin/hadoop/HadoopGraphProvider.java | 22 +-
...GiraphGraphComputerProcessIntegrateTest.java | 32 ---
.../giraph/HadoopGiraphGraphProvider.java | 36 ---
...GraphComputerGroovyProcessIntegrateTest.java | 33 ---
.../GiraphHadoopGremlinPluginIntegrateTest.java | 33 ---
pom.xml | 1 +
spark-gremlin/pom.xml | 6 -
.../spark/process/HadoopGraphProvider.java | 1 -
41 files changed, 1701 insertions(+), 1367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml
new file mode 100644
index 0000000..214e9c4
--- /dev/null
+++ b/giraph-gremlin/pom.xml
@@ -0,0 +1,235 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>tinkerpop</artifactId>
+ <version>3.1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>giraph-gremlin</artifactId>
+ <name>Apache TinkerPop :: Giraph Gremlin</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-groovy</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>hadoop-gremlin</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- GIRAPH GRAPH COMPUTER -->
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <version>1.0.0</version>
+ <exclusions>
+ <!-- self conflicts -->
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <!-- conflicts with gremlin-core -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <!-- gremlin-groovy conflicts -->
+ <exclusion>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </exclusion>
+ <!-- gremlin-test conflicts -->
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- consistent dependencies -->
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>3.1</version>
+ </dependency>
+ <!-- TEST -->
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-groovy-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>tinkergraph-gremlin</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>hyracks-releases</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
+ </repository>
+ </repositories>
+ <build>
+ <directory>${basedir}/target</directory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>${basedir}/src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>build-detached-assemblies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/assembly/standalone.xml</descriptor>
+ <descriptor>src/assembly/hadoop-job.xml</descriptor>
+ </descriptors>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.gmavenplus</groupId>
+ <artifactId>gmavenplus-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>addSources</goal>
+ <goal>addTestSources</goal>
+ <goal>generateStubs</goal>
+ <goal>compile</goal>
+ <goal>testGenerateStubs</goal>
+ <goal>testCompile</goal>
+ <goal>removeStubs</goal>
+ <goal>removeTestStubs</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <invokeDynamic>true</invokeDynamic>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Gremlin-Plugin-Dependencies>org.apache.hadoop:hadoop-core:1.2.1
+ </Gremlin-Plugin-Dependencies>
+ <!-- deletes the servlet-api jar from the path after install - causes conflicts -->
+ <Gremlin-Plugin-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+ </Gremlin-Plugin-Paths>
+ <Gremlin-Lib-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+ </Gremlin-Lib-Paths>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- log4j.configuration>log4j-core.properties</log4j.configuration> -->
+ <!--<argLine>-Xmx2048M</argLine>-->
+ <excludes>
+ <exclude>**/*IntegrateTest.java</exclude>
+ <exclude>**/*PerformanceTest.java</exclude>
+ <!-- this is technically a member of the integration test suite -->
+ <exclude>**/HadoopGremlinPluginTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
new file mode 100644
index 0000000..cb649f6
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/EmptyOutEdges.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class EmptyOutEdges implements OutEdges<ObjectWritable, NullWritable> {
+
+ private static final EmptyOutEdges INSTANCE = new EmptyOutEdges();
+
+ public static EmptyOutEdges instance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void initialize(final Iterable<Edge<ObjectWritable, NullWritable>> edges) {
+ }
+
+ @Override
+ public void initialize(final int capacity) {
+ }
+
+ @Override
+ public void initialize() {
+ }
+
+ @Override
+ public void add(final Edge<ObjectWritable, NullWritable> edge) {
+ }
+
+ @Override
+ public void remove(final ObjectWritable targetVertexId) {
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public Iterator<Edge<ObjectWritable, NullWritable>> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public void write(final DataOutput dataOutput) throws IOException {
+ }
+
+ @Override
+ public void readFields(final DataInput dataInput) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
new file mode 100644
index 0000000..7f39c46
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphComputeVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWritable, NullWritable, ObjectWritable> {
+
+ public GiraphComputeVertex() {
+ }
+
+ public GiraphComputeVertex(final VertexWritable vertexWritable) {
+ final VertexWritable newWritable = new VertexWritable();
+ newWritable.set(vertexWritable.get());
+ this.initialize(new ObjectWritable<>(newWritable.get().id()), newWritable, EmptyOutEdges.instance());
+
+ }
+
+ @Override
+ public void compute(final Iterable<ObjectWritable> messages) {
+ final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
+ final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
+ vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), workerContext.getMessenger(this, messages.iterator()), workerContext.getMemory());
+ workerContext.getVertexProgramPool().offer(vertexProgram);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
new file mode 100644
index 0000000..acd5628
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphGraphComputer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tinkerpop.giraph.process.computer.io.GiraphVertexInputFormat;
+import org.apache.tinkerpop.giraph.process.computer.io.GiraphVertexOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+
+import java.io.File;
+import java.io.NotSerializableException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphGraphComputer extends AbstractHadoopGraphComputer implements GraphComputer, Tool {
+
+ protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+ private MapMemory memory = new MapMemory();
+
+ public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
+ super(hadoopGraph);
+ final Configuration configuration = hadoopGraph.configuration();
+ configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
+ this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+ this.giraphConfiguration.setVertexClass(GiraphComputeVertex.class);
+ this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+ this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+ this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
+ this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
+ this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+ this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+ this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
+ }
+
+ @Override
+ public GraphComputer program(final VertexProgram vertexProgram) {
+ super.program(vertexProgram);
+ this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
+ final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+ vertexProgram.storeState(apacheConfiguration);
+ ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
+ this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setCombinerClass(GiraphMessageCombiner.class));
+ return this;
+ }
+
+ @Override
+ public Future<ComputerResult> submit() {
+ final long startTime = System.currentTimeMillis();
+ super.validateStatePriorToExecution();
+ return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ try {
+ final FileSystem fs = FileSystem.get(this.giraphConfiguration);
+ this.loadJars(fs);
+ fs.delete(new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+ ToolRunner.run(this, new String[]{});
+ } catch (final Exception e) {
+ //e.printStackTrace();
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+
+ this.memory.setRuntime(System.currentTimeMillis() - startTime);
+ return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
+ });
+ }
+
+ @Override
+ public int run(final String[] args) {
+ this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
+ try {
+ // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
+ if (null != this.vertexProgram) {
+ // a way to verify in Giraph whether the traversal will go over the wire or not
+ try {
+ VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
+ } catch (IllegalStateException e) {
+ if (e.getCause() instanceof NumberFormatException)
+ throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
+ }
+ // prepare the giraph vertex-centric computing job
+ final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
+ // handle input paths (if any)
+ if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+ final Path inputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+ if (!FileSystem.get(this.giraphConfiguration).exists(inputPath)) // TODO: what about when the input is not a file input?
+ throw new IllegalArgumentException("The provided input path does not exist: " + inputPath);
+ FileInputFormat.setInputPaths(job.getInternalJob(), inputPath);
+ }
+ // handle output paths
+ final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
+ FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+ job.getInternalJob().setJarByClass(GiraphGraphComputer.class);
+ this.logger.info(Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
+ // execute the job and wait until it completes (if it fails, throw an exception)
+ if (!job.run(true))
+ throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs"); // how do I get the exception that occured?
+ // add vertex program memory values to the return memory
+ for (final String key : this.vertexProgram.getMemoryComputeKeys()) {
+ final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + key);
+ final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, path);
+ if (iterator.hasNext()) {
+ this.memory.set(key, iterator.next().getValue());
+ }
+ FileSystem.get(this.giraphConfiguration).delete(path, true);
+ }
+ final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_ITERATION);
+ this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
+ FileSystem.get(this.giraphConfiguration).delete(path, true);
+ }
+ // do map reduce jobs
+ this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
+ for (final MapReduce mapReduce : this.mapReducers) {
+ this.memory.addMapReduceMemoryKey(mapReduce);
+ MapReduceHelper.executeMapReduceJob(mapReduce, this.memory, this.giraphConfiguration);
+ }
+
+ // if no persistence, delete the map reduce output
+ if (this.persist.equals(Persist.NOTHING)) {
+ final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
+ if (FileSystem.get(this.giraphConfiguration).exists(outputPath)) // TODO: what about when the output is not a file output?
+ FileSystem.get(this.giraphConfiguration).delete(outputPath, true);
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ return 0;
+ }
+
+ @Override
+ public void setConf(final org.apache.hadoop.conf.Configuration configuration) {
+ // TODO: is this necessary to implement?
+ }
+
+ @Override
+ public org.apache.hadoop.conf.Configuration getConf() {
+ return this.giraphConfiguration;
+ }
+
+ private void loadJars(final FileSystem fs) {
+ final String hadoopGremlinLibsRemote = "hadoop-gremlin-libs";
+ if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+ final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+ if (null == hadoopGremlinLocalLibs)
+ this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+ else {
+ final String[] paths = hadoopGremlinLocalLibs.split(":");
+ for (final String path : paths) {
+ final File file = new File(path);
+ if (file.exists()) {
+ Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> {
+ try {
+ final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName());
+ fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
+ try {
+ DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs);
+ } catch (final Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ });
+ } else {
+ this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+ }
+ }
+ }
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+ new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
new file mode 100644
index 0000000..5a56bd3
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMemory.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMemory extends MasterCompute implements Memory {
+
+ private VertexProgram<?> vertexProgram;
+ private GiraphWorkerContext worker;
+ private Set<String> memoryKeys;
+ private boolean isMasterCompute = true;
+ private long startTime = System.currentTimeMillis();
+
+ public GiraphMemory() {
+ // Giraph ReflectionUtils requires this to be public at minimum
+ }
+
+ public GiraphMemory(final GiraphWorkerContext worker, final VertexProgram<?> vertexProgram) {
+ this.worker = worker;
+ this.vertexProgram = vertexProgram;
+ this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+ this.isMasterCompute = false;
+ }
+
+
+ @Override
+ public void initialize() {
+ // do not initialize aggregators here because the getConf() configuration is not available at this point
+ // use compute() initial iteration instead
+ }
+
+ @Override
+ public void compute() {
+ this.isMasterCompute = true;
+ if (0 == this.getSuperstep()) { // setup
+ final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getConf());
+ this.vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+ this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+ try {
+ for (final String key : this.memoryKeys) {
+ MemoryHelper.validateKey(key);
+ this.registerPersistentAggregator(key, MemoryAggregator.class);
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ this.vertexProgram.setup(this);
+ } else {
+ if (this.vertexProgram.terminate(this)) { // terminate
+ // write the memory to HDFS
+ final MapMemory memory = new MapMemory(this);
+ // a hack to get the last iteration memory values to stick
+ this.vertexProgram.terminate(memory);
+ final String outputLocation = this.getConf().get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ if (null != outputLocation) {
+ try {
+ for (final String key : this.keys()) {
+ final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
+ writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
+ writer.close();
+ }
+ final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + Constants.HIDDEN_ITERATION), ObjectWritable.class, ObjectWritable.class);
+ writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.getIteration()));
+ writer.close();
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ this.haltComputation();
+ }
+ }
+ }
+
+ @Override
+ public int getIteration() {
+ if (this.isMasterCompute) {
+ final int temp = (int) this.getSuperstep();
+ return temp == 0 ? temp : temp - 1;
+ } else {
+ return (int) this.worker.getSuperstep();
+ }
+ }
+
+ @Override
+ public long getRuntime() {
+ return System.currentTimeMillis() - this.startTime;
+ }
+
+ @Override
+ public Set<String> keys() {
+ return this.memoryKeys.stream().filter(this::exists).collect(Collectors.toSet());
+ }
+
+ @Override
+ public boolean exists(final String key) {
+ final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+ return null != rule.getObject();
+ }
+
+ @Override
+ public <R> R get(final String key) throws IllegalArgumentException {
+ //this.checkKey(key);
+ final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+ if (null == rule.getObject())
+ throw Memory.Exceptions.memoryDoesNotExist(key);
+ else
+ return rule.getObject();
+ }
+
+ @Override
+ public void set(final String key, Object value) {
+ this.checkKeyValue(key, value);
+ if (this.isMasterCompute)
+ this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
+ else
+ this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
+ }
+
+ @Override
+ public void and(final String key, final boolean bool) {
+ this.checkKeyValue(key, bool);
+ if (this.isMasterCompute) { // only called on setup() and terminate()
+ Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
+ value = null == value ? bool : bool && value;
+ this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
+ } else {
+ this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
+ }
+ }
+
+ @Override
+ public void or(final String key, final boolean bool) {
+ this.checkKeyValue(key, bool);
+ if (this.isMasterCompute) { // only called on setup() and terminate()
+ Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
+ value = null == value ? bool : bool || value;
+ this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
+ } else {
+ this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
+ }
+ }
+
+ @Override
+ public void incr(final String key, final long delta) {
+ this.checkKeyValue(key, delta);
+ if (this.isMasterCompute) { // only called on setup() and terminate()
+ Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
+ value = null == value ? delta : value.longValue() + delta;
+ this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
+ } else {
+ this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
+ }
+ }
+
+ @Override
+ public void write(final DataOutput output) {
+ // no need to serialize the master compute as it gets its data from aggregators
+ // is this true?
+ }
+
+ @Override
+ public void readFields(final DataInput input) {
+ // no need to serialize the master compute as it gets its data from aggregators
+ // is this true?
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.memoryString(this);
+ }
+
+ private void checkKeyValue(final String key, final Object value) {
+ if (!this.memoryKeys.contains(key))
+ throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+ MemoryHelper.validateValue(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
new file mode 100644
index 0000000..8da835d
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessageCombiner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMessageCombiner extends Combiner<ObjectWritable, ObjectWritable> implements ImmutableClassesGiraphConfigurable {
+
+ private MessageCombiner messageCombiner;
+ private ImmutableClassesGiraphConfiguration configuration;
+
+ @Override
+ public void combine(final ObjectWritable vertexIndex, final ObjectWritable originalMessage, final ObjectWritable messageToCombine) {
+ originalMessage.set(originalMessage.isEmpty() ?
+ messageToCombine.get() :
+ this.messageCombiner.combine(originalMessage.get(), messageToCombine.get()));
+ }
+
+ @Override
+ public ObjectWritable createInitialMessage() {
+ return ObjectWritable.empty();
+ }
+
+ @Override
+ public void setConf(final ImmutableClassesGiraphConfiguration configuration) {
+ this.configuration = configuration;
+ final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(configuration);
+ this.messageCombiner = (MessageCombiner) VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().get();
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return this.configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
new file mode 100644
index 0000000..ca3e100
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphMessenger.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphMessenger<M> implements Messenger<M> {
+
+ private GiraphComputeVertex giraphComputeVertex;
+ private Iterator<ObjectWritable<M>> messages;
+
+ public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable<M>> messages) {
+ this.giraphComputeVertex = giraphComputeVertex;
+ this.messages = messages;
+ }
+
+ @Override
+ public Iterator<M> receiveMessages() {
+ return IteratorUtils.map(this.messages, ObjectWritable::get);
+ }
+
+ @Override
+ public void sendMessage(final MessageScope messageScope, final M message) {
+ if (messageScope instanceof MessageScope.Local) {
+ final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
+ final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
+ incidentTraversal.forEachRemaining(edge ->
+ this.giraphComputeVertex.sendMessage(
+ new ObjectWritable<>(edge.vertices(direction).next().id()),
+ new ObjectWritable<>(localMessageScope.getEdgeFunction().apply(message, edge))));
+ } else {
+ final MessageScope.Global globalMessageScope = (MessageScope.Global) messageScope;
+ globalMessageScope.vertices().forEach(vertex ->
+ this.giraphComputeVertex.sendMessage(new ObjectWritable<>(vertex.id()), new ObjectWritable<>(message)));
+ }
+ }
+
+ private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+ incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+ return (T) incidentTraversal;
+ }
+
+ private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+ final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+ return step.getDirection().opposite();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
new file mode 100644
index 0000000..6ff7dc0
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/GiraphWorkerContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphWorkerContext extends WorkerContext {
+
+ private VertexProgramPool vertexProgramPool;
+ private GiraphMemory memory;
+
+ public GiraphWorkerContext() {
+ // Giraph ReflectionUtils requires this to be public at minimum
+ }
+
+ public void preApplication() throws InstantiationException, IllegalAccessException {
+ final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
+ HadoopPools.initialize(apacheConfiguration);
+ final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+ this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
+ this.memory = new GiraphMemory(this, vertexProgram);
+ }
+
+ public void postApplication() {
+
+ }
+
+ public void preSuperstep() {
+ this.vertexProgramPool.workerIterationStart(new ImmutableMemory(this.memory));
+ }
+
+ public void postSuperstep() {
+ this.vertexProgramPool.workerIterationEnd(new ImmutableMemory(this.memory));
+ }
+
+ public VertexProgramPool getVertexProgramPool() {
+ return this.vertexProgramPool;
+ }
+
+ public GiraphMemory getMemory() {
+ return this.memory;
+ }
+
+ public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable> messages) {
+ return new GiraphMessenger(giraphComputeVertex, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
new file mode 100644
index 0000000..6526fab
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/MemoryAggregator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MemoryAggregator implements Aggregator<Rule> {
+
+ private Object currentObject;
+ private Rule.Operation lastOperation = null;
+
+ public MemoryAggregator() {
+ this.currentObject = null;
+ }
+
+ @Override
+ public Rule getAggregatedValue() {
+ if (null == this.currentObject)
+ return createInitialValue();
+ else if (this.currentObject instanceof Long)
+ return new Rule(Rule.Operation.INCR, this.currentObject);
+ else
+ return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
+ }
+
+ @Override
+ public void setAggregatedValue(final Rule rule) {
+ this.currentObject = rule.getObject();
+ }
+
+ @Override
+ public void reset() {
+ this.currentObject = null;
+ }
+
+ @Override
+ public Rule createInitialValue() {
+ return new Rule(Rule.Operation.NO_OP, null);
+ }
+
+ @Override
+ public void aggregate(final Rule ruleWritable) {
+ final Rule.Operation rule = ruleWritable.getOperation();
+ final Object object = ruleWritable.getObject();
+ if (rule != Rule.Operation.NO_OP)
+ this.lastOperation = rule;
+
+ if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
+ this.currentObject = object;
+ } else {
+ if (rule.equals(Rule.Operation.INCR)) {
+ this.currentObject = (Long) this.currentObject + (Long) object;
+ } else if (rule.equals(Rule.Operation.AND)) {
+ this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+ } else if (rule.equals(Rule.Operation.OR)) {
+ this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+ } else if (rule.equals(Rule.Operation.NO_OP)) {
+ if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
+ if (null == this.lastOperation) {
+ // do nothing ... why?
+ } else if (this.lastOperation.equals(Rule.Operation.AND)) {
+ this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+ } else if (this.lastOperation.equals(Rule.Operation.OR)) {
+ this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+ } else {
+ throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("The provided rule is unknown: " + ruleWritable);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
new file mode 100644
index 0000000..2b3b723
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexInputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexInputFormat extends VertexInputFormat {
+
+ private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+
+ @Override
+ public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return this.hadoopGraphInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
+ this.constructor(context.getConfiguration());
+ try {
+ return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private final void constructor(final Configuration configuration) {
+ if (null == this.hadoopGraphInputFormat) {
+ this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
new file mode 100644
index 0000000..0135bd5
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexOutputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexOutputFormat extends VertexOutputFormat {
+
+ private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
+
+ @Override
+ public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+ }
+
+ @Override
+ public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.constructor(context.getConfiguration());
+ return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+ }
+
+ private final void constructor(final Configuration configuration) {
+ if (null == this.hadoopGraphOutputFormat) {
+ this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
new file mode 100644
index 0000000..7c8273c
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexReader extends VertexReader {
+
+ private RecordReader<NullWritable, VertexWritable> recordReader;
+
+ public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+ this.recordReader = recordReader;
+ }
+
+ @Override
+ public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return this.recordReader.nextKeyValue();
+ }
+
+ @Override
+ public Vertex getCurrentVertex() throws IOException, InterruptedException {
+ return new GiraphComputeVertex(this.recordReader.getCurrentValue());
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return this.recordReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
new file mode 100644
index 0000000..812386d
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/giraph/process/computer/io/GiraphVertexWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.giraph.process.computer.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.giraph.process.computer.GiraphComputeVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexWriter extends VertexWriter {
+ private final OutputFormat<NullWritable, VertexWritable> outputFormat;
+ private RecordWriter<NullWritable, VertexWritable> recordWriter;
+
+ public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
+ this.outputFormat = outputFormat;
+ }
+
+ @Override
+ public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordWriter = this.outputFormat.getRecordWriter(context);
+ }
+
+ @Override
+ public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
+ this.recordWriter.close(context);
+ }
+
+ @Override
+ public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+ this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
new file mode 100644
index 0000000..ae0ae10
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/HadoopGraphProvider.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.giraph.process;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.script.ScriptResourceAccess;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HadoopGraphProvider extends AbstractGraphProvider {
+
+ private static final Random RANDOM = new Random();
+ private boolean graphSONInput = false;
+
+ public static Map<String, String> PATHS = new HashMap<>();
+ private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+ add(HadoopEdge.class);
+ add(HadoopElement.class);
+ add(HadoopGraph.class);
+ add(HadoopProperty.class);
+ add(HadoopVertex.class);
+ add(HadoopVertexProperty.class);
+ }};
+
+ static {
+ try {
+ final List<String> kryoResources = Arrays.asList(
+ "tinkerpop-modern.kryo",
+ "grateful-dead.kryo",
+ "tinkerpop-classic.kryo",
+ "tinkerpop-crew.kryo");
+ for (final String fileName : kryoResources) {
+ PATHS.put(fileName, TestHelper.generateTempFileFromResource(GryoResourceAccess.class, fileName, "").getAbsolutePath());
+ }
+
+ final List<String> graphsonResources = Arrays.asList(
+ "tinkerpop-modern.json",
+ "grateful-dead.json",
+ "tinkerpop-classic.json",
+ "tinkerpop-crew.json");
+ for (final String fileName : graphsonResources) {
+ PATHS.put(fileName, TestHelper.generateTempFileFromResource(GraphSONResourceAccess.class, fileName, "").getAbsolutePath());
+ }
+
+ final List<String> scriptResources = Arrays.asList(
+ "tinkerpop-classic.txt",
+ "script-input.groovy",
+ "script-output.groovy",
+ "grateful-dead.txt",
+ "script-input-grateful-dead.groovy",
+ "script-output-grateful-dead.groovy");
+ for (final String fileName : scriptResources) {
+ PATHS.put(fileName, TestHelper.generateTempFileFromResource(ScriptResourceAccess.class, fileName, "").getAbsolutePath());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
+ this.graphSONInput = RANDOM.nextBoolean();
+ return new HashMap<String, Object>() {{
+ put(Graph.GRAPH, HadoopGraph.class.getName());
+ put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
+ put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+ put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ /// giraph configuration
+ put(GiraphConstants.MIN_WORKERS, 1);
+ put(GiraphConstants.MAX_WORKERS, 1);
+ put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
+ put(GiraphConstants.ZOOKEEPER_SERVER_PORT.getKey(), 2181); // you must have a local zookeeper running on this port
+ put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+ put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+ put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
+ put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
+ put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
+ put("mapred.reduce.tasks", 4);
+ put("giraph.vertexOutputFormatThreadSafe", false);
+ put("giraph.numOutputThreads", 3);
+ }};
+ }
+
+ @Override
+ public void clear(final Graph graph, final Configuration configuration) throws Exception {
+ if (graph != null)
+ graph.close();
+ }
+
+ @Override
+ public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) {
+ if (loadGraphWith != null) this.loadGraphDataViaHadoopConfig(graph, loadGraphWith.value());
+ }
+
+ @Override
+ public Set<Class> getImplementations() {
+ return IMPLEMENTATION;
+ }
+
+ public void loadGraphDataViaHadoopConfig(final Graph g, final LoadGraphWith.GraphData graphData) {
+ final String type = this.graphSONInput ? "json" : "kryo";
+
+ if (graphData.equals(LoadGraphWith.GraphData.GRATEFUL)) {
+ ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("grateful-dead." + type));
+ } else if (graphData.equals(LoadGraphWith.GraphData.MODERN)) {
+ ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-modern." + type));
+ } else if (graphData.equals(LoadGraphWith.GraphData.CLASSIC)) {
+ ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-classic." + type));
+ } else if (graphData.equals(LoadGraphWith.GraphData.CREW)) {
+ ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-crew." + type));
+ } else {
+ throw new RuntimeException("Could not load graph with " + graphData);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
new file mode 100644
index 0000000..857f3b8
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputerProcessIntegrateTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerProcessIntegrateTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
new file mode 100644
index 0000000..8b124e5
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/HadoopGiraphGraphProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer;
+
+import org.apache.tinkerpop.giraph.process.computer.GiraphGraphComputer;
+import org.apache.tinkerpop.gremlin.GraphProvider;
+import org.apache.tinkerpop.gremlin.giraph.process.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@GraphProvider.Descriptor(computer = GiraphGraphComputer.class)
+public final class HadoopGiraphGraphProvider extends HadoopGraphProvider {
+
+ public GraphTraversalSource traversal(final Graph graph) {
+ return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(GiraphGraphComputer.class)).create(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
new file mode 100644
index 0000000..9b01185
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer.groovy;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.HadoopGiraphGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(GroovyProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerGroovyProcessIntegrateTest {
+}
[2/3] incubator-tinkerpop git commit: pulled out giraph-gremlin from
hadoop-gremlin. HadoopGremlin builds nicely. Giraph too. Spark is still
causing problems.
Posted by ok...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
new file mode 100644
index 0000000..fd87d2e
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer.groovy;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.HadoopGiraphGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+//@RunWith(HadoopPluginSuite.class)
+@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphHadoopGremlinPluginIntegrateTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 674dce0..9161046 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -48,56 +48,12 @@ limitations under the License.
</exclusions>
<!--<scope>provided</scope>-->
</dependency>
- <!-- GIRAPH GRAPH COMPUTER -->
- <dependency>
- <groupId>org.apache.giraph</groupId>
- <artifactId>giraph-core</artifactId>
- <version>1.0.0</version>
- <exclusions>
- <!-- self conflicts -->
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <!-- conflicts with gremlin-core -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <!-- gremlin-groovy conflicts -->
- <exclusion>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- </exclusion>
- <!-- gremlin-test conflicts -->
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- consistent dependencies -->
- <dependency>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- <version>3.1</version>
- </dependency>
<!-- TEST -->
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
@@ -112,12 +68,6 @@ limitations under the License.
<scope>test</scope>
</dependency>
</dependencies>
- <repositories>
- <repository>
- <id>hyracks-releases</id>
- <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
- </repository>
- </repositories>
<build>
<directory>${basedir}/target</directory>
<finalName>${project.artifactId}-${project.version}</finalName>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index 18c4b32..ec03569 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.PluginAcceptor;
import org.apache.tinkerpop.gremlin.groovy.plugin.PluginInitializationException;
import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -69,7 +68,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
add(IMPORT_SPACE + ScriptInputFormat.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
////
- add(IMPORT_SPACE + GiraphGraphComputer.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
}};
@@ -89,7 +87,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", JobClient.class.getName()));
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", Job.class.getName()));
///
- pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", GiraphGraphComputer.class.getName()));
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
///
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/EmptyOutEdges.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/EmptyOutEdges.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/EmptyOutEdges.java
deleted file mode 100644
index 607f085..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/EmptyOutEdges.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.OutEdges;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class EmptyOutEdges implements OutEdges<ObjectWritable, NullWritable> {
-
- private static final EmptyOutEdges INSTANCE = new EmptyOutEdges();
-
- public static EmptyOutEdges instance() {
- return INSTANCE;
- }
-
- @Override
- public void initialize(final Iterable<Edge<ObjectWritable, NullWritable>> edges) {
- }
-
- @Override
- public void initialize(final int capacity) {
- }
-
- @Override
- public void initialize() {
- }
-
- @Override
- public void add(final Edge<ObjectWritable, NullWritable> edge) {
- }
-
- @Override
- public void remove(final ObjectWritable targetVertexId) {
- }
-
- @Override
- public int size() {
- return 0;
- }
-
- @Override
- public Iterator<Edge<ObjectWritable, NullWritable>> iterator() {
- return Collections.emptyIterator();
- }
-
- @Override
- public void write(final DataOutput dataOutput) throws IOException {
- }
-
- @Override
- public void readFields(final DataInput dataInput) throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
deleted file mode 100644
index 2581388..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWritable, NullWritable, ObjectWritable> {
-
- public GiraphComputeVertex() {
- }
-
- public GiraphComputeVertex(final VertexWritable vertexWritable) {
- final VertexWritable newWritable = new VertexWritable();
- newWritable.set(vertexWritable.get());
- this.initialize(new ObjectWritable<>(newWritable.get().id()), newWritable, EmptyOutEdges.instance());
-
- }
-
- @Override
- public void compute(final Iterable<ObjectWritable> messages) {
- final GiraphWorkerContext workerContext = (GiraphWorkerContext) this.getWorkerContext();
- final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
- vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), workerContext.getMessenger(this, messages.iterator()), workerContext.getMemory());
- workerContext.getVertexProgramPool().offer(vertexProgram);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
deleted file mode 100644
index 6995331..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVertexInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io.GiraphVertexOutputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-
-import java.io.File;
-import java.io.NotSerializableException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphGraphComputer extends AbstractHadoopGraphComputer implements GraphComputer, Tool {
-
- protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
- private MapMemory memory = new MapMemory();
-
- public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
- super(hadoopGraph);
- final Configuration configuration = hadoopGraph.configuration();
- configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
- this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
- this.giraphConfiguration.setVertexClass(GiraphComputeVertex.class);
- this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
- this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
- this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
- this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
- this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
- this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
- this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
- }
-
- @Override
- public GraphComputer program(final VertexProgram vertexProgram) {
- super.program(vertexProgram);
- this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
- final BaseConfiguration apacheConfiguration = new BaseConfiguration();
- vertexProgram.storeState(apacheConfiguration);
- ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
- this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setCombinerClass(GiraphMessageCombiner.class));
- return this;
- }
-
- @Override
- public Future<ComputerResult> submit() {
- final long startTime = System.currentTimeMillis();
- super.validateStatePriorToExecution();
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
- try {
- final FileSystem fs = FileSystem.get(this.giraphConfiguration);
- this.loadJars(fs);
- fs.delete(new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
- ToolRunner.run(this, new String[]{});
- } catch (final Exception e) {
- //e.printStackTrace();
- throw new IllegalStateException(e.getMessage(), e);
- }
-
- this.memory.setRuntime(System.currentTimeMillis() - startTime);
- return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
- });
- }
-
- @Override
- public int run(final String[] args) {
- this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
- try {
- // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
- if (null != this.vertexProgram) {
- // a way to verify in Giraph whether the traversal will go over the wire or not
- try {
- VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration));
- } catch (IllegalStateException e) {
- if (e.getCause() instanceof NumberFormatException)
- throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster");
- }
- // prepare the giraph vertex-centric computing job
- final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
- // handle input paths (if any)
- if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
- final Path inputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
- if (!FileSystem.get(this.giraphConfiguration).exists(inputPath)) // TODO: what about when the input is not a file input?
- throw new IllegalArgumentException("The provided input path does not exist: " + inputPath);
- FileInputFormat.setInputPaths(job.getInternalJob(), inputPath);
- }
- // handle output paths
- final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
- FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
- job.getInternalJob().setJarByClass(GiraphGraphComputer.class);
- this.logger.info(Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
- // execute the job and wait until it completes (if it fails, throw an exception)
- if (!job.run(true))
- throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs"); // how do I get the exception that occured?
- // add vertex program memory values to the return memory
- for (final String key : this.vertexProgram.getMemoryComputeKeys()) {
- final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + key);
- final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, path);
- if (iterator.hasNext()) {
- this.memory.set(key, iterator.next().getValue());
- }
- FileSystem.get(this.giraphConfiguration).delete(path, true);
- }
- final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_ITERATION);
- this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
- FileSystem.get(this.giraphConfiguration).delete(path, true);
- }
- // do map reduce jobs
- this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
- for (final MapReduce mapReduce : this.mapReducers) {
- this.memory.addMapReduceMemoryKey(mapReduce);
- MapReduceHelper.executeMapReduceJob(mapReduce, this.memory, this.giraphConfiguration);
- }
-
- // if no persistence, delete the map reduce output
- if (this.persist.equals(Persist.NOTHING)) {
- final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
- if (FileSystem.get(this.giraphConfiguration).exists(outputPath)) // TODO: what about when the output is not a file output?
- FileSystem.get(this.giraphConfiguration).delete(outputPath, true);
- }
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- return 0;
- }
-
- @Override
- public void setConf(final org.apache.hadoop.conf.Configuration configuration) {
- // TODO: is this necessary to implement?
- }
-
- @Override
- public org.apache.hadoop.conf.Configuration getConf() {
- return this.giraphConfiguration;
- }
-
- private void loadJars(final FileSystem fs) {
- final String hadoopGremlinLibsRemote = "hadoop-gremlin-libs";
- if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
- final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
- if (null == hadoopGremlinLocalLibs)
- this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
- else {
- final String[] paths = hadoopGremlinLocalLibs.split(":");
- for (final String path : paths) {
- final File file = new File(path);
- if (file.exists()) {
- Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> {
- try {
- final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName());
- fs.copyFromLocalFile(new Path(f.getPath()), jarFile);
- try {
- DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs);
- } catch (final Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- });
- } else {
- this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
- }
- }
- }
- }
- }
-
- public static void main(final String[] args) throws Exception {
- final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
- new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
deleted file mode 100644
index c557e42..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphMemory extends MasterCompute implements Memory {
-
- private VertexProgram<?> vertexProgram;
- private GiraphWorkerContext worker;
- private Set<String> memoryKeys;
- private boolean isMasterCompute = true;
- private long startTime = System.currentTimeMillis();
-
- public GiraphMemory() {
- // Giraph ReflectionUtils requires this to be public at minimum
- }
-
- public GiraphMemory(final GiraphWorkerContext worker, final VertexProgram<?> vertexProgram) {
- this.worker = worker;
- this.vertexProgram = vertexProgram;
- this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
- this.isMasterCompute = false;
- }
-
-
- @Override
- public void initialize() {
- // do not initialize aggregators here because the getConf() configuration is not available at this point
- // use compute() initial iteration instead
- }
-
- @Override
- public void compute() {
- this.isMasterCompute = true;
- if (0 == this.getSuperstep()) { // setup
- final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getConf());
- this.vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
- try {
- for (final String key : this.memoryKeys) {
- MemoryHelper.validateKey(key);
- this.registerPersistentAggregator(key, MemoryAggregator.class);
- }
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- this.vertexProgram.setup(this);
- } else {
- if (this.vertexProgram.terminate(this)) { // terminate
- // write the memory to HDFS
- final MapMemory memory = new MapMemory(this);
- // a hack to get the last iteration memory values to stick
- this.vertexProgram.terminate(memory);
- final String outputLocation = this.getConf().get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
- if (null != outputLocation) {
- try {
- for (final String key : this.keys()) {
- final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
- writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
- writer.close();
- }
- final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + Constants.HIDDEN_ITERATION), ObjectWritable.class, ObjectWritable.class);
- writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.getIteration()));
- writer.close();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- this.haltComputation();
- }
- }
- }
-
- @Override
- public int getIteration() {
- if (this.isMasterCompute) {
- final int temp = (int) this.getSuperstep();
- return temp == 0 ? temp : temp - 1;
- } else {
- return (int) this.worker.getSuperstep();
- }
- }
-
- @Override
- public long getRuntime() {
- return System.currentTimeMillis() - this.startTime;
- }
-
- @Override
- public Set<String> keys() {
- return this.memoryKeys.stream().filter(this::exists).collect(Collectors.toSet());
- }
-
- @Override
- public boolean exists(final String key) {
- final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
- return null != rule.getObject();
- }
-
- @Override
- public <R> R get(final String key) throws IllegalArgumentException {
- //this.checkKey(key);
- final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
- if (null == rule.getObject())
- throw Memory.Exceptions.memoryDoesNotExist(key);
- else
- return rule.getObject();
- }
-
- @Override
- public void set(final String key, Object value) {
- this.checkKeyValue(key, value);
- if (this.isMasterCompute)
- this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
- else
- this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
- }
-
- @Override
- public void and(final String key, final boolean bool) {
- this.checkKeyValue(key, bool);
- if (this.isMasterCompute) { // only called on setup() and terminate()
- Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
- value = null == value ? bool : bool && value;
- this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
- } else {
- this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
- }
- }
-
- @Override
- public void or(final String key, final boolean bool) {
- this.checkKeyValue(key, bool);
- if (this.isMasterCompute) { // only called on setup() and terminate()
- Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
- value = null == value ? bool : bool || value;
- this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
- } else {
- this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
- }
- }
-
- @Override
- public void incr(final String key, final long delta) {
- this.checkKeyValue(key, delta);
- if (this.isMasterCompute) { // only called on setup() and terminate()
- Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
- value = null == value ? delta : value.longValue() + delta;
- this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
- } else {
- this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
- }
- }
-
- @Override
- public void write(final DataOutput output) {
- // no need to serialize the master compute as it gets its data from aggregators
- // is this true?
- }
-
- @Override
- public void readFields(final DataInput input) {
- // no need to serialize the master compute as it gets its data from aggregators
- // is this true?
- }
-
- @Override
- public String toString() {
- return StringFactory.memoryString(this);
- }
-
- private void checkKeyValue(final String key, final Object value) {
- if (!this.memoryKeys.contains(key))
- throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
- MemoryHelper.validateValue(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessageCombiner.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessageCombiner.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessageCombiner.java
deleted file mode 100644
index 1494d51..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessageCombiner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphMessageCombiner extends Combiner<ObjectWritable, ObjectWritable> implements ImmutableClassesGiraphConfigurable {
-
- private MessageCombiner messageCombiner;
- private ImmutableClassesGiraphConfiguration configuration;
-
- @Override
- public void combine(final ObjectWritable vertexIndex, final ObjectWritable originalMessage, final ObjectWritable messageToCombine) {
- originalMessage.set(originalMessage.isEmpty() ?
- messageToCombine.get() :
- this.messageCombiner.combine(originalMessage.get(), messageToCombine.get()));
- }
-
- @Override
- public ObjectWritable createInitialMessage() {
- return ObjectWritable.empty();
- }
-
- @Override
- public void setConf(final ImmutableClassesGiraphConfiguration configuration) {
- this.configuration = configuration;
- final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(configuration);
- this.messageCombiner = (MessageCombiner) VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().get();
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return this.configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
deleted file mode 100644
index 83b2943..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphMessenger<M> implements Messenger<M> {
-
- private GiraphComputeVertex giraphComputeVertex;
- private Iterator<ObjectWritable<M>> messages;
-
- public GiraphMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable<M>> messages) {
- this.giraphComputeVertex = giraphComputeVertex;
- this.messages = messages;
- }
-
- @Override
- public Iterator<M> receiveMessages() {
- return IteratorUtils.map(this.messages, ObjectWritable::get);
- }
-
- @Override
- public void sendMessage(final MessageScope messageScope, final M message) {
- if (messageScope instanceof MessageScope.Local) {
- final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = GiraphMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.giraphComputeVertex.getValue().get());
- final Direction direction = GiraphMessenger.getOppositeDirection(incidentTraversal);
- incidentTraversal.forEachRemaining(edge ->
- this.giraphComputeVertex.sendMessage(
- new ObjectWritable<>(edge.vertices(direction).next().id()),
- new ObjectWritable<>(localMessageScope.getEdgeFunction().apply(message, edge))));
- } else {
- final MessageScope.Global globalMessageScope = (MessageScope.Global) messageScope;
- globalMessageScope.vertices().forEach(vertex ->
- this.giraphComputeVertex.sendMessage(new ObjectWritable<>(vertex.id()), new ObjectWritable<>(message)));
- }
- }
-
- private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
- incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
- return (T) incidentTraversal;
- }
-
- private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
- final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
- return step.getDirection().opposite();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
deleted file mode 100644
index 85d2c2d..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphWorkerContext.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
-import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphWorkerContext extends WorkerContext {
-
- private VertexProgramPool vertexProgramPool;
- private GiraphMemory memory;
-
- public GiraphWorkerContext() {
- // Giraph ReflectionUtils requires this to be public at minimum
- }
-
- public void preApplication() throws InstantiationException, IllegalAccessException {
- final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
- HadoopPools.initialize(apacheConfiguration);
- final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
- this.memory = new GiraphMemory(this, vertexProgram);
- }
-
- public void postApplication() {
-
- }
-
- public void preSuperstep() {
- this.vertexProgramPool.workerIterationStart(new ImmutableMemory(this.memory));
- }
-
- public void postSuperstep() {
- this.vertexProgramPool.workerIterationEnd(new ImmutableMemory(this.memory));
- }
-
- public VertexProgramPool getVertexProgramPool() {
- return this.vertexProgramPool;
- }
-
- public GiraphMemory getMemory() {
- return this.memory;
- }
-
- public GiraphMessenger getMessenger(final GiraphComputeVertex giraphComputeVertex, final Iterator<ObjectWritable> messages) {
- return new GiraphMessenger(giraphComputeVertex, messages);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
deleted file mode 100644
index 8f11fb3..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MemoryAggregator implements Aggregator<Rule> {
-
- private Object currentObject;
- private Rule.Operation lastOperation = null;
-
- public MemoryAggregator() {
- this.currentObject = null;
- }
-
- @Override
- public Rule getAggregatedValue() {
- if (null == this.currentObject)
- return createInitialValue();
- else if (this.currentObject instanceof Long)
- return new Rule(Rule.Operation.INCR, this.currentObject);
- else
- return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
- }
-
- @Override
- public void setAggregatedValue(final Rule rule) {
- this.currentObject = rule.getObject();
- }
-
- @Override
- public void reset() {
- this.currentObject = null;
- }
-
- @Override
- public Rule createInitialValue() {
- return new Rule(Rule.Operation.NO_OP, null);
- }
-
- @Override
- public void aggregate(final Rule ruleWritable) {
- final Rule.Operation rule = ruleWritable.getOperation();
- final Object object = ruleWritable.getObject();
- if (rule != Rule.Operation.NO_OP)
- this.lastOperation = rule;
-
- if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
- this.currentObject = object;
- } else {
- if (rule.equals(Rule.Operation.INCR)) {
- this.currentObject = (Long) this.currentObject + (Long) object;
- } else if (rule.equals(Rule.Operation.AND)) {
- this.currentObject = (Boolean) this.currentObject && (Boolean) object;
- } else if (rule.equals(Rule.Operation.OR)) {
- this.currentObject = (Boolean) this.currentObject || (Boolean) object;
- } else if (rule.equals(Rule.Operation.NO_OP)) {
- if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
- if (null == this.lastOperation) {
- // do nothing ... why?
- } else if (this.lastOperation.equals(Rule.Operation.AND)) {
- this.currentObject = (Boolean) this.currentObject && (Boolean) object;
- } else if (this.lastOperation.equals(Rule.Operation.OR)) {
- this.currentObject = (Boolean) this.currentObject || (Boolean) object;
- } else {
- throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
- }
- }
- } else {
- throw new IllegalArgumentException("The provided rule is unknown: " + ruleWritable);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexInputFormat.java
deleted file mode 100644
index 7dcaae7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexInputFormat.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexInputFormat extends VertexInputFormat {
-
- private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
-
- @Override
- public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphInputFormat.getSplits(context);
- }
-
- @Override
- public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
- this.constructor(context.getConfiguration());
- try {
- return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphInputFormat) {
- this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexOutputFormat.java
deleted file mode 100644
index b2ad139..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexOutputFormat.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexOutputFormat extends VertexOutputFormat {
-
- private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
- @Override
- public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
- }
-
- @Override
- public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- this.hadoopGraphOutputFormat.checkOutputSpecs(context);
- }
-
- @Override
- public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.constructor(context.getConfiguration());
- return this.hadoopGraphOutputFormat.getOutputCommitter(context);
- }
-
- private final void constructor(final Configuration configuration) {
- if (null == this.hadoopGraphOutputFormat) {
- this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
deleted file mode 100644
index c78c7c8..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexReader extends VertexReader {
-
- private RecordReader<NullWritable, VertexWritable> recordReader;
-
- public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
- this.recordReader = recordReader;
- }
-
- @Override
- public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordReader.initialize(inputSplit, context);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return this.recordReader.nextKeyValue();
- }
-
- @Override
- public Vertex getCurrentVertex() throws IOException, InterruptedException {
- return new GiraphComputeVertex(this.recordReader.getCurrentValue());
- }
-
- @Override
- public void close() throws IOException {
- this.recordReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return this.recordReader.getProgress();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
deleted file mode 100644
index c66c801..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/io/GiraphVertexWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphComputeVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexWriter extends VertexWriter {
- private final OutputFormat<NullWritable, VertexWritable> outputFormat;
- private RecordWriter<NullWritable, VertexWritable> recordWriter;
-
- public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
- this.outputFormat = outputFormat;
- }
-
- @Override
- public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordWriter = this.outputFormat.getRecordWriter(context);
- }
-
- @Override
- public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
- this.recordWriter.close(context);
- }
-
- @Override
- public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
- this.recordWriter.write(NullWritable.get(), ((GiraphComputeVertex) vertex).getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 3c3c9b7..ae4f910 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -23,7 +23,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
@@ -66,18 +65,18 @@ import java.util.stream.Stream;
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$Traversals",
method = "g_V_matchXa_knows_b__c_knows_bX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$Traversals",
method = "g_V_matchXa_created_b__c_created_bX_selectXa_b_cX_byXnameX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$Traversals",
method = "g_V_out_asXcX_matchXb_knows_a__c_created_eX_selectXcX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ // computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$Traversals",
method = "g_V_matchXa_hasXname_GarciaX__a_0writtenBy_b__a_0sungBy_bX",
@@ -85,18 +84,18 @@ import java.util.stream.Stream;
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_knows_b__c_knows_bX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_created_b__c_created_bX_selectXa_b_cX_byXnameX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_out_asXcX_matchXb_knows_a__c_created_eX_selectXcX",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX",
@@ -140,18 +139,18 @@ import java.util.stream.Stream;
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldNotAllowNullMemoryKeys",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldNotAllowSettingUndeclaredMemoryKeys",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldHaveConsistentMemoryVertexPropertiesAndExceptions",
- reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.",
- computers = {GiraphGraphComputer.class})
+ reason = "Giraph does a hard kill on failure and stops threads which stops test cases. Exception handling semantics are correct though.")
+ //computers = {GiraphGraphComputer.class})
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileTest$Traversals",
method = "g_V_out_out_profile_grateful",
@@ -189,15 +188,15 @@ public final class HadoopGraph implements Graph {
@Override
public <C extends GraphComputer> C compute(final Class<C> graphComputerClass) {
- if (graphComputerClass.equals(GiraphGraphComputer.class))
- return (C) new GiraphGraphComputer(this);
- else {
+ //if (graphComputerClass.equals(GiraphGraphComputer.class))
+ // return (C) new GiraphGraphComputer(this);
+ //else {
try {
return graphComputerClass.getConstructor(HadoopGraph.class).newInstance(this);
} catch (final Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
- }
+ //}
//else if (graphComputerClass.equals(SparkGraphComputer.class))
// return (C) new SparkGraphComputer(this);
//else
@@ -206,7 +205,8 @@ public final class HadoopGraph implements Graph {
@Override
public GraphComputer compute() {
- return this.compute(GiraphGraphComputer.class);
+ return null;
+ //return this.compute(GiraphGraphComputer.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 27b84f0..ba0e75e 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -19,8 +19,6 @@
package org.apache.tinkerpop.gremlin.hadoop;
import org.apache.commons.configuration.Configuration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.TestHelper;
@@ -112,16 +110,16 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
/// giraph configuration
- put(GiraphConstants.MIN_WORKERS, 1);
- put(GiraphConstants.MAX_WORKERS, 1);
- put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
- put(GiraphConstants.ZOOKEEPER_SERVER_PORT.getKey(), 2181); // you must have a local zookeeper running on this port
- put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
- put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
- put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
- put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
- put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
- put("mapred.reduce.tasks", 4);
+ //put(GiraphConstants.MIN_WORKERS, 1);
+ //put(GiraphConstants.MAX_WORKERS, 1);
+ //put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
+ //put(GiraphConstants.ZOOKEEPER_SERVER_PORT.getKey(), 2181); // you must have a local zookeeper running on this port
+ //put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+ //put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+ //put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
+ //put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
+ //put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
+ //put("mapred.reduce.tasks", 4);
//put("giraph.vertexOutputFormatThreadSafe", false);
//put("giraph.numOutputThreads", 3);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
deleted file mode 100644
index 9b62137..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
-public class GiraphGraphComputerProcessIntegrateTest {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/HadoopGiraphGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/HadoopGiraphGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/HadoopGiraphGraphProvider.java
deleted file mode 100644
index ae3e484..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/HadoopGiraphGraphProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.tinkerpop.gremlin.GraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@GraphProvider.Descriptor(computer = GiraphGraphComputer.class)
-public final class HadoopGiraphGraphProvider extends HadoopGraphProvider {
-
- public GraphTraversalSource traversal(final Graph graph) {
- return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(GiraphGraphComputer.class)).create(graph);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
deleted file mode 100644
index d6aae67..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.HadoopGiraphGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(GroovyProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
-public class GiraphGraphComputerGroovyProcessIntegrateTest {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphHadoopGremlinPluginIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
deleted file mode 100644
index 8463b31..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.HadoopGiraphGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(HadoopPluginSuite.class)
-@GraphProviderClass(provider = HadoopGiraphGraphProvider.class, graph = HadoopGraph.class)
-public class GiraphHadoopGremlinPluginIntegrateTest {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac56b309/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a4411c..d822651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@ limitations under the License.
<module>gremlin-console</module>
<module>gremlin-server</module>
<module>spark-gremlin</module>
+ <module>giraph-gremlin</module>
</modules>
<scm>
<connection>scm:git:git@git-wip-us.apache.org:repos/asf/incubator-tinkerpop.git</connection>