You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/18 15:52:08 UTC
[2/2] git commit: updated refs/heads/trunk to 9ded6c3
GIRAPH-758
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9ded6c37
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9ded6c37
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9ded6c37
Branch: refs/heads/trunk
Commit: 9ded6c372b3bcbceb28f6186efea7140c1f62dcd
Parents: adc87ae
Author: Claudio Martella <cl...@gmail.com>
Authored: Mon Nov 18 15:51:18 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Mon Nov 18 15:51:18 2013 +0100
----------------------------------------------------------------------
giraph-gora/conf/edge.avsc | 12 +
giraph-gora/conf/gora-cassandra-mapping.xml | 42 ++
giraph-gora/conf/gora-hbase-mapping.xml | 63 +++
giraph-gora/conf/gora.properties | 29 ++
giraph-gora/conf/vertex.json | 18 +
giraph-gora/conf/zoo.cfg | 4 +
giraph-gora/pom.xml | 125 ++++++
giraph-gora/src/main/assembly/compile.xml | 39 ++
.../io/gora/GoraGVertexVertexInputFormat.java | 98 +++++
.../io/gora/GoraGVertexVertexOutputFormat.java | 83 ++++
.../giraph/io/gora/GoraVertexInputFormat.java | 414 +++++++++++++++++++
.../giraph/io/gora/GoraVertexOutputFormat.java | 297 +++++++++++++
.../io/gora/constants/GiraphGoraConstants.java | 80 ++++
.../giraph/io/gora/constants/package-info.java | 21 +
.../giraph/io/gora/generated/GVertex.java | 280 +++++++++++++
.../giraph/io/gora/generated/GVertexResult.java | 280 +++++++++++++
.../giraph/io/gora/generated/package-info.java | 21 +
.../org/apache/giraph/io/gora/package-info.java | 21 +
.../giraph/io/gora/utils/DefaultKeyFactory.java | 45 ++
.../io/gora/utils/ExtraGoraInputFormat.java | 172 ++++++++
.../apache/giraph/io/gora/utils/GoraUtils.java | 194 +++++++++
.../apache/giraph/io/gora/utils/KeyFactory.java | 54 +++
.../giraph/io/gora/utils/package-info.java | 21 +
.../io/gora/GoraTestVertexInputFormat.java | 128 ++++++
.../io/gora/GoraTestVertexOutputFormat.java | 114 +++++
.../io/gora/TestGoraVertexInputFormat.java | 121 ++++++
.../io/gora/TestGoraVertexOutputFormat.java | 74 ++++
27 files changed, 2850 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/edge.avsc
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/edge.avsc b/giraph-gora/conf/edge.avsc
new file mode 100644
index 0000000..c5caeb1
--- /dev/null
+++ b/giraph-gora/conf/edge.avsc
@@ -0,0 +1,12 @@
+{
+ "type": "record",
+ "name": "GEdge",
+ "namespace": "org.apache.giraph.gora.generated",
+ "fields" : [
+ {"name": "edgeId", "type": "string"},
+ {"name": "edgeWeight", "type": "float"},
+ {"name": "vertexInId", "type": "string"},
+ {"name": "vertexOutId", "type": "string"},
+ {"name": "label", "type": "string"}
+ ]
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora-cassandra-mapping.xml b/giraph-gora/conf/gora-cassandra-mapping.xml
new file mode 100644
index 0000000..85079dd
--- /dev/null
+++ b/giraph-gora/conf/gora-cassandra-mapping.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<gora-orm>
+
+ <keyspace name="graphGiraph" host="localhost" cluster="Test Cluster">
+ <family name="vertices"/>
+ </keyspace>
+
+ <class name="org.apache.giraph.io.gora.generated.GVertex" keyClass="java.lang.String" keyspace="graphGiraph">
+ <field name="vertexId" family="vertices" qualifier="vertexId"/>
+ <field name="value" family="vertices" qualifier="value"/>
+ <field name="edges" family="vertices" qualifier="edges"/>
+ </class>
+
+ <keyspace name="graphGiraphResults" host="localhost" cluster="Test Cluster">
+ <family name="vertices"/>
+ </keyspace>
+
+ <class name="org.apache.giraph.io.gora.generated.GVertexResult" keyClass="java.lang.String" keyspace="graphGiraphResults">
+ <field name="vertexId" family="vertices" qualifier="vertexId"/>
+ <field name="value" family="vertices" qualifier="value"/>
+ <field name="edges" family="vertices" qualifier="edges"/>
+ </class>
+
+</gora-orm>
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora-hbase-mapping.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora-hbase-mapping.xml b/giraph-gora/conf/gora-hbase-mapping.xml
new file mode 100644
index 0000000..fd40e30
--- /dev/null
+++ b/giraph-gora/conf/gora-hbase-mapping.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<gora-orm>
+
+ <table name="graphGiraph">
+ <family name="vertices"/>
+ </table>
+
+ <class name="org.apache.giraph.io.gora.generated.GVertex" keyClass="java.lang.String" table="graphGiraph">
+ <field name="vertexId" family="vertices" qualifier="vertexId"/>
+ <field name="value" family="vertices" qualifier="value"/>
+ <field name="edges" family="vertices" qualifier="edges"/>
+ </class>
+
+ <table name="graphGiraphResults">
+ <family name="vertices"/>
+ </table>
+
+ <class name="org.apache.giraph.io.gora.generated.GVertexResult" keyClass="java.lang.String" table="graphGiraphResults">
+ <field name="vertexId" family="vertices" qualifier="vertexId"/>
+ <field name="value" family="vertices" qualifier="value"/>
+ <field name="edges" family="vertices" qualifier="edges"/>
+ </class>
+
+ <table name="graphGiraphEdges">
+ <family name="edges"/>
+ </table>
+ <class name="org.apache.giraph.io.gora.generated.GEdge" keyClass="java.lang.String" table="graphGiraphEdges">
+ <field name="edgeId" family="edges" qualifier="edgeId"/>
+ <field name="edgeWeight" family="edges" qualifier="edgeWeight"/>
+ <field name="label" family="edges" qualifier="label"/>
+ <field name="vertexInId" family="edges" qualifier="vertexInId"/>
+ <field name="vertexOutId" family="edges" qualifier="vertexOutId"/>
+ </class>
+
+ <table name="graphGiraphResultEdges">
+ <family name="edges"/>
+ </table>
+ <class name="org.apache.giraph.io.gora.generated.GEdgeResult" keyClass="java.lang.String" table="graphGiraphResultEdges">
+ <field name="edgeId" family="edges" qualifier="edgeId"/>
+ <field name="edgeWeight" family="edges" qualifier="edgeWeight"/>
+ <field name="label" family="edges" qualifier="label"/>
+ <field name="vertexInId" family="edges" qualifier="vertexInId"/>
+ <field name="vertexOutId" family="edges" qualifier="vertexOutId"/>
+ </class>
+</gora-orm>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora.properties
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora.properties b/giraph-gora/conf/gora.properties
new file mode 100644
index 0000000..d59faad
--- /dev/null
+++ b/giraph-gora/conf/gora.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FOR HBASE DATASTORE
+gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
+
+# FOR CASSANDRA DATASTORE
+#gora.datastore.default=org.apache.gora.cassandra.CassandraStore
+
+# FOR DYNAMO DATASTORE
+#gora.datastore.default=org.apache.gora.dynamodb.store.DynamoDBStore
+#gora.datastore.autocreateschema=true
+
+#preferred.schema.name=person
+#gora.dynamodb.client=sync
+#gora.dynamodb.consistent.reads=true
+#gora.dynamodb.endpoint=http://dynamodb.us-east-1.amazonaws.com/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/vertex.json
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/vertex.json b/giraph-gora/conf/vertex.json
new file mode 100644
index 0000000..9f435fa
--- /dev/null
+++ b/giraph-gora/conf/vertex.json
@@ -0,0 +1,18 @@
+{
+ "type": "record",
+ "name": "Vertex",
+ "namespace": "org.apache.giraph.gora.generated",
+ "fields" : [
+ {"name": "vertexId", "type": "long"},
+ {"name": "value", "type": "float"},
+ {"name": "edges", "type": {"type":"array", "items": {
+ "name": "Edge",
+ "type": "record",
+ "namespace": "org.apache.giraph.gora.generated",
+ "fields": [
+ {"name": "vertexId", "type": "long"},
+ {"name": "edgeValue", "type": "float"}
+ ]
+ }}}
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/zoo.cfg
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/zoo.cfg b/giraph-gora/conf/zoo.cfg
new file mode 100644
index 0000000..f4cec32
--- /dev/null
+++ b/giraph-gora/conf/zoo.cfg
@@ -0,0 +1,4 @@
+tickTime=20000
+dataDir=/var/zookeeper
+clientPort=2181
+maxClientCnxns=300
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/pom.xml b/giraph-gora/pom.xml
new file mode 100644
index 0000000..4b9ca6f
--- /dev/null
+++ b/giraph-gora/pom.xml
@@ -0,0 +1,125 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>giraph-gora</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Giraph Gora I/O</name>
+ <url>http://gora.apache.org/giraph-gora/</url>
+ <description>Giraph Gora input/output classes</description>
+
+ <properties>
+ <top.dir>${project.basedir}/..</top.dir>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <configuration>
+ <siteDirectory>${project.basedir}/src/site</siteDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>prop.jarLocation</name>
+ <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- compile dependencies. sorted lexicographically. -->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>xalan</groupId>
+ <artifactId>xalan</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <!-- test dependencies. sorted lexicographically. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/assembly/compile.xml b/giraph-gora/src/main/assembly/compile.xml
new file mode 100644
index 0000000..6acf679
--- /dev/null
+++ b/giraph-gora/src/main/assembly/compile.xml
@@ -0,0 +1,39 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>/</outputDirectory>
+ <unpackOptions>
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ </excludes>
+ </unpackOptions>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
new file mode 100644
index 0000000..cb0f005
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.gora.generated.GVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Example implementation of a specific reader for a generated data bean.
+ */
+public class GoraGVertexVertexInputFormat
+ extends GoraVertexInputFormat<LongWritable, DoubleWritable,
+ FloatWritable> {
+
+ /**
+ * DEfault constructor
+ */
+ public GoraGVertexVertexInputFormat() {
+ }
+
+ /**
+ * Creates specific vertex reader to be used inside Hadoop.
+ * @param split split to be read.
+ * @param context JobContext to be used.
+ * @return GoraVertexReader Vertex reader to be used by Hadoop.
+ */
+ @Override
+ public GoraVertexReader createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new GoraGVertexVertexReader();
+ }
+
+ /**
+ * Gora vertex reader
+ */
+ protected class GoraGVertexVertexReader extends GoraVertexReader {
+
+ /**
+ * Transforms a GoraObject into a Vertex object.
+ * @param goraObject Object from Gora to be translated.
+ * @return Vertex Result from transforming the gora object.
+ */
+ @Override
+ protected Vertex<LongWritable, DoubleWritable, FloatWritable>
+ transformVertex(Object goraObject) {
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex;
+ /* create the actual vertex */
+ vertex = getConf().createVertex();
+ GVertex tmpGVertex = (GVertex) goraObject;
+
+ LongWritable vrtxId = new LongWritable(
+ Long.parseLong(tmpGVertex.getVertexId().toString()));
+ DoubleWritable vrtxValue = new DoubleWritable(tmpGVertex.getValue());
+ vertex.initialize(vrtxId, vrtxValue);
+ if (tmpGVertex.getEdges() != null && !tmpGVertex.getEdges().isEmpty()) {
+ Set<Utf8> keyIt = tmpGVertex.getEdges().keySet();
+ for (Utf8 key : keyIt) {
+ String keyVal = key.toString();
+ String valVal = tmpGVertex.getEdges().get(key).toString();
+ Edge<LongWritable, FloatWritable> edge;
+ if (!keyVal.contains("vertexId") && !keyVal.contains("value")) {
+ edge = EdgeFactory.create(
+ new LongWritable(Long.parseLong(keyVal)),
+ new FloatWritable(Float.parseFloat(valVal)));
+ vertex.addEdge(edge);
+ }
+ }
+ }
+ return vertex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
new file mode 100644
index 0000000..893e083
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.gora.generated.GVertexResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific reader for a generated data bean.
+ */
+public class GoraGVertexVertexOutputFormat
+ extends GoraVertexOutputFormat<LongWritable, DoubleWritable,
+ FloatWritable> {
+
+ /**
+ * DEfault constructor
+ */
+ public GoraGVertexVertexOutputFormat() {
+ }
+
+ @Override
+ public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new GoraGVertexVertexWriter();
+ }
+
+ /**
+ * Gora vertex writer.
+ */
+ protected class GoraGVertexVertexWriter extends GoraVertexWriter {
+
+ @Override
+ protected Persistent getGoraVertex(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
+ GVertexResult tmpGVertex = new GVertexResult();
+ tmpGVertex.setVertexId(new Utf8(vertex.getId().toString()));
+ tmpGVertex.setValue(Float.parseFloat(vertex.getValue().toString()));
+ Iterator<Edge<LongWritable, FloatWritable>> it =
+ vertex.getEdges().iterator();
+ while (it.hasNext()) {
+ Edge<LongWritable, FloatWritable> edge = it.next();
+ tmpGVertex.putToEdges(
+ new Utf8(edge.getTargetVertexId().toString()),
+ new Utf8(edge.getValue().toString()));
+ }
+ return tmpGVertex;
+ }
+
+ @Override
+ protected Object getGoraKey(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
+ String goraKey = String.valueOf(vertex.getId());
+ return goraKey;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
new file mode 100644
index 0000000..9a6ad8c
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.gora.utils.KeyFactory;
+import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Class which wraps the GoraInputFormat. It's designed
+ * as an extension point to VertexInputFormat subclasses who wish
+ * to read from Gora data sources.
+ *
+ * Works with
+ * {@link GoraVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ */
+public abstract class GoraVertexInputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ extends VertexInputFormat<I, V, E> {
+
+ /** Start key for querying Gora data store. */
+ private static Object START_KEY;
+
+ /** End key for querying Gora data store. */
+ private static Object END_KEY;
+
+ /** Logger for Gora's vertex input format. */
+ private static final Logger LOG =
+ Logger.getLogger(GoraVertexInputFormat.class);
+
+ /** KeyClass used for getting data. */
+ private static Class<?> KEY_CLASS;
+
+ /** The vertex itself will be used as a value inside Gora. */
+ private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+ /** Data store class to be used as backend. */
+ private static Class<? extends DataStore> DATASTORE_CLASS;
+
+ /** Class used to transform strings into Keys */
+ private static Class<?> KEY_FACTORY_CLASS;
+
+ /** Data store used for querying data. */
+ private static DataStore DATA_STORE;
+
+ /** counter for iinput records */
+ private static int RECORD_COUNTER = 0;
+
+ /** Delegate Gora input format */
+ private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
+ new ExtraGoraInputFormat();
+
+ /** @param conf configuration parameters */
+ public void checkInputSpecs(Configuration conf) {
+ String sDataStoreType =
+ GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
+ String sKeyType =
+ GIRAPH_GORA_KEY_CLASS.get(getConf());
+ String sPersistentType =
+ GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
+ String sKeyFactoryClass =
+ GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
+ try {
+ Class<?> keyClass = Class.forName(sKeyType);
+ Class<?> persistentClass = Class.forName(sPersistentType);
+ Class<?> dataStoreClass = Class.forName(sDataStoreType);
+ Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
+ setKeyClass(keyClass);
+ setPersistentClass((Class<? extends Persistent>) persistentClass);
+ setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+ setKeyFactoryClass(keyFactoryClass);
+ setDataStore(createDataStore());
+ GORA_INPUT_FORMAT.setDataStore(getDataStore());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error while reading Gora Input parameters");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Create a vertex reader for a given split. Guaranteed to have been
+ * configured with setConf() prior to use. The framework will also call
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split the split to be read
+ * @param context the information about the task
+ * @return a new record reader
+ * @throws IOException
+ */
+ public abstract GoraVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException;
+
+ /**
+ * Gets the splits for a data store.
+ * @param context JobContext
+ * @param minSplitCountHint Hint for a minimum split count
+ * @return List<InputSplit> A list of splits
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+ throws IOException, InterruptedException {
+ KeyFactory kFact = null;
+ try {
+ kFact = (KeyFactory) getKeyFactoryClass().newInstance();
+ kFact.setDataStore(getDataStore());
+ } catch (InstantiationException e) {
+ LOG.error("Key factory was not instantiated. Please verify.");
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ LOG.error("Key factory was not instantiated. Please verify.");
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ }
+ String sKey = GIRAPH_GORA_START_KEY.get(getConf());
+ String eKey = GIRAPH_GORA_END_KEY.get(getConf());
+ if (sKey == null || sKey.isEmpty()) {
+ LOG.warn("No start key has been defined.");
+ LOG.warn("Querying all the data store.");
+ sKey = null;
+ eKey = null;
+ } else {
+ setStartKey(kFact.buildKey(sKey));
+ setEndKey(kFact.buildKey(eKey));
+ }
+ Query tmpQuery = GoraUtils.getQuery(
+ getDataStore(), getStartKey(), getEndKey());
+ GORA_INPUT_FORMAT.setQuery(tmpQuery);
+ List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
+ return splits;
+ }
+
+ /**
+ * Gets the data store object initialized.
+ * @return DataStore created
+ */
+ public DataStore createDataStore() {
+ DataStore dsCreated = null;
+ try {
+ dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+ getKeyClass(), getPersistentClass());
+ } catch (GoraException e) {
+ LOG.error("Error creating data store.");
+ e.printStackTrace();
+ }
+ return dsCreated;
+ }
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use
+ * key instead.
+ */
+ protected abstract class GoraVertexReader extends VertexReader<I, V, E> {
+ /** Current vertex */
+ private Vertex<I, V, E> vertex;
+ /** Results gotten from Gora data store. */
+ private Result readResults;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ getResults();
+ RECORD_COUNTER = 0;
+ }
+
+ /**
+ * Gets the next vertex from Gora data store.
+ * @return true/false depending on the existence of vertices.
+ * @throws IOException exceptions passed along.
+ * @throws InterruptedException exceptions passed along.
+ */
+ @Override
+ // CHECKSTYLE: stop IllegalCatch
+ public boolean nextVertex() throws IOException, InterruptedException {
+ boolean flg = false;
+ try {
+ flg = this.getReadResults().next();
+ this.vertex = transformVertex(this.getReadResults().get());
+ RECORD_COUNTER++;
+ } catch (Exception e) {
+ LOG.error("Error transforming vertices.");
+ LOG.error(e.getMessage());
+ flg = false;
+ }
+ LOG.debug(RECORD_COUNTER + " were transformed.");
+ return flg;
+ }
+ // CHECKSTYLE: resume IllegalCatch
+
+ /**
+ * Gets the progress of reading results from Gora.
+ * @return the progress of reading results from Gora.
+ */
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = 0.0f;
+ if (getReadResults() != null) {
+ progress = getReadResults().getProgress();
+ }
+ return progress;
+ }
+
+ /**
+ * Gets current vertex.
+ *
+ * @return The vertex object represented by a Gora object
+ */
+ @Override
+ public Vertex<I, V, E> getCurrentVertex()
+ throws IOException, InterruptedException {
+ return this.vertex;
+ }
+
+ /**
+ * Parser for a single Gora object
+ *
+ * @param goraObject vertex represented as a GoraObject
+ * @return The vertex object represented by a Gora object
+ */
+ protected abstract Vertex<I, V, E> transformVertex(Object goraObject);
+
+ /**
+ * Performs a range query to a Gora data store.
+ */
+ protected void getResults() {
+ setReadResults(GoraUtils.getRequest(getDataStore(),
+ getStartKey(), getEndKey()));
+ }
+
+ /**
+ * Finishes the reading process.
+ * @throws IOException.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Gets the results read.
+ * @return results read.
+ */
+ Result getReadResults() {
+ return readResults;
+ }
+
+ /**
+ * Sets the results read.
+ * @param readResults results read.
+ */
+ void setReadResults(Result readResults) {
+ this.readResults = readResults;
+ }
+ }
+
+ /**
+ * Gets the persistent Class
+ * @return persistentClass used
+ */
+ static Class<? extends Persistent> getPersistentClass() {
+ return PERSISTENT_CLASS;
+ }
+
+ /**
+ * Sets the persistent Class
+ * @param persistentClassUsed to be set
+ */
+ static void setPersistentClass
+ (Class<? extends Persistent> persistentClassUsed) {
+ PERSISTENT_CLASS = persistentClassUsed;
+ }
+
+ /**
+ * Gets the key class used.
+ * @return the key class used.
+ */
+ static Class<?> getKeyClass() {
+ return KEY_CLASS;
+ }
+
+ /**
+ * Sets the key class used.
+ * @param keyClassUsed key class used.
+ */
+ static void setKeyClass(Class<?> keyClassUsed) {
+ KEY_CLASS = keyClassUsed;
+ }
+
+ /**
+ * @return Class the DATASTORE_CLASS
+ */
+ public static Class<? extends DataStore> getDatastoreClass() {
+ return DATASTORE_CLASS;
+ }
+
+ /**
+ * @param dataStoreClass the dataStore class to set
+ */
+ public static void setDatastoreClass(
+ Class<? extends DataStore> dataStoreClass) {
+ DATASTORE_CLASS = dataStoreClass;
+ }
+
+ /**
+ * Gets the start key for querying.
+ * @return the start key.
+ */
+ public Object getStartKey() {
+ return START_KEY;
+ }
+
+ /**
+ * Gets the start key for querying.
+ * @param startKey start key.
+ */
+ public static void setStartKey(Object startKey) {
+ START_KEY = startKey;
+ }
+
+ /**
+ * Gets the end key for querying.
+ * @return the end key.
+ */
+ static Object getEndKey() {
+ return END_KEY;
+ }
+
+ /**
+ * Sets the end key for querying.
+ * @param pEndKey start key.
+ */
+ static void setEndKey(Object pEndKey) {
+ END_KEY = pEndKey;
+ }
+
+ /**
+ * Gets the key factory class.
+ * @return the kEY_FACTORY_CLASS
+ */
+ static Class<?> getKeyFactoryClass() {
+ return KEY_FACTORY_CLASS;
+ }
+
+ /**
+ * Sets the key factory class.
+ * @param keyFactoryClass the keyFactoryClass to set.
+ */
+ static void setKeyFactoryClass(Class<?> keyFactoryClass) {
+ KEY_FACTORY_CLASS = keyFactoryClass;
+ }
+
+ /**
+ * Gets the data store.
+ * @return DataStore
+ */
+ public static DataStore getDataStore() {
+ return DATA_STORE;
+ }
+
+ /**
+ * Sets the data store
+ * @param dStore the dATA_STORE to set
+ */
+ public static void setDataStore(DataStore dStore) {
+ DATA_STORE = dStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
new file mode 100644
index 0000000..5fcc684
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+/**
+ *
+ * Class which wraps the GoraOutputFormat. It's designed
+ * as an extension point to VertexOutputFormat subclasses who wish
+ * to write vertices back to an Accumulo table.
+ *
+ * Works with
+ * {@link GoraVertexInputFormat}
+ *
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ */
+public abstract class GoraVertexOutputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+
+ /** Logger for Gora's vertex input format. */
+ private static final Logger LOG =
+ Logger.getLogger(GoraVertexOutputFormat.class);
+
+ /** KeyClass used for getting data. */
+ private static Class<?> KEY_CLASS;
+
+ /** The vertex itself will be used as a value inside Gora. */
+ private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+ /** Data store class to be used as backend. */
+ private static Class<? extends DataStore> DATASTORE_CLASS;
+
+ /** Data store used for querying data. */
+ private static DataStore DATA_STORE;
+
+ /**
+ * checkOutputSpecs
+ *
+ * @param context information about the job
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ }
+
+ /**
+ * Gets the data store object initialized.
+ * @return DataStore created
+ */
+ public DataStore createDataStore() {
+ DataStore dsCreated = null;
+ try {
+ dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+ getKeyClass(), getPersistentClass());
+ } catch (GoraException e) {
+ getLogger().error("Error creating data store.");
+ e.printStackTrace();
+ }
+ return dsCreated;
+ }
+
+ /**
+ * getOutputCommitter
+ *
+ * @param context the task context
+ * @return OutputCommitter
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Empty output commiter for hadoop.
+ */
+ private static class NullOutputCommitter extends OutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException { }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException { }
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException { }
+ }
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex/edges output. Easiest to ignore the key value separator and only
+ * use key instead.
+ */
+ protected abstract class GoraVertexWriter
+ extends VertexWriter<I, V, E>
+ implements Watcher {
+ /** lock for management of the barrier */
+ private final Object lock = new Object();
+
+ @Override
+ public void initialize(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ String sDataStoreType =
+ GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
+ String sKeyType =
+ GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
+ String sPersistentType =
+ GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
+ try {
+ Class<?> keyClass = Class.forName(sKeyType);
+ Class<?> persistentClass = Class.forName(sPersistentType);
+ Class<?> dataStoreClass = Class.forName(sDataStoreType);
+ setKeyClass(keyClass);
+ setPersistentClass((Class<? extends Persistent>) persistentClass);
+ setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+ setDataStore(createDataStore());
+ if (getDataStore() != null) {
+ getLogger().info("The output data store has been created.");
+ }
+ } catch (ClassNotFoundException e) {
+ getLogger().error("Error while reading Gora Output parameters");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ getDataStore().flush();
+ getDataStore().close();
+ }
+
+ @Override
+ public void writeVertex(Vertex<I, V, E> vertex)
+ throws IOException, InterruptedException {
+ Persistent goraVertex = null;
+ Object goraKey = getGoraKey(vertex);
+ goraVertex = getGoraVertex(vertex);
+ getDataStore().put(goraKey, goraVertex);
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ EventType type = event.getType();
+ if (type == EventType.NodeChildrenChanged) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("signal: number of children changed.");
+ }
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
+ }
+
+ /**
+ * Each vertex needs to be transformed into a Gora object to be sent to
+ * a specific data store.
+ *
+ * @param vertex vertex to be transformed into a Gora object
+ * @return Gora representation of the vertex
+ */
+ protected abstract Persistent getGoraVertex(Vertex<I, V, E> vertex);
+
+ /**
+ * Gets the correct key from a computed vertex.
+ * @param vertex vertex to extract the key from.
+ * @return The key representing such vertex
+ */
+ protected abstract Object getGoraKey(Vertex<I, V, E> vertex);
+
+ }
+
+ /**
+ * Gets the data store.
+ * @return DataStore
+ */
+ public static DataStore getDataStore() {
+ return DATA_STORE;
+ }
+
+ /**
+ * Sets the data store
+ * @param dStore the dATA_STORE to set
+ */
+ public static void setDataStore(DataStore dStore) {
+ DATA_STORE = dStore;
+ }
+
+ /**
+ * Gets the persistent Class
+ * @return persistentClass used
+ */
+ static Class<? extends Persistent> getPersistentClass() {
+ return PERSISTENT_CLASS;
+ }
+
+ /**
+ * Sets the persistent Class
+ * @param persistentClassUsed to be set
+ */
+ static void setPersistentClass
+ (Class<? extends Persistent> persistentClassUsed) {
+ PERSISTENT_CLASS = persistentClassUsed;
+ }
+
+ /**
+ * Gets the key class used.
+ * @return the key class used.
+ */
+ static Class<?> getKeyClass() {
+ return KEY_CLASS;
+ }
+
+ /**
+ * Sets the key class used.
+ * @param keyClassUsed key class used.
+ */
+ static void setKeyClass(Class<?> keyClassUsed) {
+ KEY_CLASS = keyClassUsed;
+ }
+
+ /**
+ * @return Class the DATASTORE_CLASS
+ */
+ public static Class<? extends DataStore> getDatastoreClass() {
+ return DATASTORE_CLASS;
+ }
+
+ /**
+ * @param dataStoreClass the dataStore class to set
+ */
+ public static void setDatastoreClass(
+ Class<? extends DataStore> dataStoreClass) {
+ DATASTORE_CLASS = dataStoreClass;
+ }
+
+ /**
+ * Returns a logger.
+ * @return the log for the output format.
+ */
+ public static Logger getLogger() {
+ return LOG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
new file mode 100644
index 0000000..187bf59
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.constants;
+
+import org.apache.giraph.conf.StrConfOption;
+
+/**
+ * Constants used all over Giraph for configuration specific for Gora
+ */
+// CHECKSTYLE: stop InterfaceIsTypeCheck
+public interface GiraphGoraConstants {
+ /** Gora data store class which provides data access. */
+ StrConfOption GIRAPH_GORA_DATASTORE_CLASS =
+ new StrConfOption("giraph.gora.datastore.class", null,
+ "Gora DataStore class to access to data from. " +
+ "- required");
+
+ /** Gora key class to query the data store. */
+ StrConfOption GIRAPH_GORA_KEY_CLASS =
+ new StrConfOption("giraph.gora.key.class", null,
+ "Gora Key class to query the datastore. " +
+ "- required");
+
+ /** Gora persistent class to query the data store. */
+ StrConfOption GIRAPH_GORA_PERSISTENT_CLASS =
+ new StrConfOption("giraph.gora.persistent.class", null,
+ "Gora Persistent class to read objects from Gora. " +
+ "- required");
+
+ /** Gora start key to query the datastore. */
+ StrConfOption GIRAPH_GORA_START_KEY =
+ new StrConfOption("giraph.gora.start.key", null,
+ "Gora start key to query the datastore. ");
+
+ /** Gora end key to query the datastore. */
+ StrConfOption GIRAPH_GORA_END_KEY =
+ new StrConfOption("giraph.gora.end.key", null,
+ "Gora end key to query the datastore. ");
+
+ /** Gora data store class which provides data access. */
+ StrConfOption GIRAPH_GORA_KEYS_FACTORY_CLASS =
+ new StrConfOption("giraph.gora.keys.factory.class", null,
+ "Keys factory to convert strings into desired keys" +
+ "- required");
+
+ // OUTPUT
+ /** Gora data store class which provides data access. */
+ StrConfOption GIRAPH_GORA_OUTPUT_DATASTORE_CLASS =
+ new StrConfOption("giraph.gora.output.datastore.class", null,
+ "Gora DataStore class to write data to. " +
+ "- required");
+
+ /** Gora key class to query the data store. */
+ StrConfOption GIRAPH_GORA_OUTPUT_KEY_CLASS =
+ new StrConfOption("giraph.gora.output.key.class", null,
+ "Gora Key class to write to datastore. " +
+ "- required");
+
+ /** Gora persistent class to query the data store. */
+ StrConfOption GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS =
+ new StrConfOption("giraph.gora.output.persistent.class", null,
+ "Gora Persistent class to write to Gora. " +
+ "- required");
+}
+// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
new file mode 100644
index 0000000..3a8b96f
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Constants for Gora Input/Output formats
+ */
+package org.apache.giraph.io.gora.constants;
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
new file mode 100644
index 0000000..efd7b95
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Vertex.
+ */
+@SuppressWarnings("all")
+public class GVertex extends PersistentBase {
+ /**
+ * Schema used for the class.
+ */
+ public static final Schema OBJ_SCHEMA = Schema.parse(
+ "{\"type\":\"record\",\"name\":\"Vertex\"," +
+ "\"namespace\":\"org.apache.giraph.gora.generated\"," +
+ "\"fields\":[{\"name\":\"vertexId\",\"type\":\"string\"}," +
+ "{\"name\":\"value\",\"type\":\"float\"},{\"name\":\"edges\"," +
+ "\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+
+ /**
+ * Field enum
+ */
+ public static enum Field {
+ /**
+ * VertexId
+ */
+ VERTEX_ID(0, "vertexId"),
+
+ /**
+ * Field value
+ */
+ VALUE(1, "value"),
+
+ /**
+ * Edges
+ */
+ EDGES(2, "edges");
+
+ /**
+ * Field index
+ */
+ private int index;
+
+ /**
+ * Field name
+ */
+ private String name;
+
+ /**
+ * Field constructor
+ * @param index of attribute
+ * @param name of attribute
+ */
+ Field(int index, String name) {
+ this.index = index;
+ this.name = name;
+ }
+
+ /**
+ * Gets index
+ * @return int of attribute.
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String toString() {
+ return name;
+ }
+ };
+
+ /**
+ * Array containing all fields/
+ */
+ private static final String[] ALL_FIELDS = {
+ "vertexId", "value", "edges"
+ };
+
+ static {
+ PersistentBase.registerFields(GVertex.class, ALL_FIELDS);
+ }
+
+ /**
+ * Vertex Id
+ */
+ private Utf8 vertexId;
+
+ /**
+ * Value
+ */
+ private float value;
+
+ /**
+ * Edges
+ */
+ private Map<Utf8, Utf8> edges;
+
+ /**
+ * Default constructor
+ */
+ public GVertex() {
+ this(new StateManagerImpl());
+ }
+
+ /**
+ * Constructor
+ * @param stateManager from which the object will be created.
+ */
+ public GVertex(StateManager stateManager) {
+ super(stateManager);
+ edges = new StatefulHashMap<Utf8, Utf8>();
+ }
+
+ /**
+ * Creates a new instance
+ * @param stateManager from which the object will be created.
+ * @return GVertex created
+ */
+ public GVertex newInstance(StateManager stateManager) {
+ return new GVertex(stateManager);
+ }
+
+ /**
+ * Gets the object schema
+ * @return Schema of the object.
+ */
+ public Schema getSchema() {
+ return OBJ_SCHEMA;
+ }
+
+ /**
+ * Gets field
+ * @param fieldIndex index of field to be used.
+ * @return Object from an index.
+ */
+ public Object get(int fieldIndex) {
+ switch (fieldIndex) {
+ case 0:
+ return vertexId;
+ case 1:
+ return value;
+ case 2:
+ return edges;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Puts a value into a field.
+ * @param fieldIndex index of field used.
+ * @param fieldValue value of field used.
+ */
+ @SuppressWarnings(value = "unchecked")
+ public void put(int fieldIndex, Object fieldValue) {
+ if (isFieldEqual(fieldIndex, fieldValue)) {
+ return;
+ }
+ getStateManager().setDirty(this, fieldIndex);
+ switch (fieldIndex) {
+ case 0:
+ vertexId = (Utf8) fieldValue; break;
+ case 1:
+ value = (Float) fieldValue; break;
+ case 2:
+ edges = (Map<Utf8, Utf8>) fieldValue; break;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets vertexId
+ * @return Utf8 vertexId
+ */
+ public Utf8 getVertexId() {
+ return (Utf8) get(0);
+ }
+
+ /**
+ * Sets vertexId
+ * @param value vertexId
+ */
+ public void setVertexId(Utf8 value) {
+ put(0, value);
+ }
+
+ /**
+ * Gets value
+ * @return String of value.
+ */
+ public float getValue() {
+ return (Float) get(1);
+ }
+
+ /**
+ * Sets value
+ * @param value .
+ */
+ public void setValue(float value) {
+ put(1, value);
+ }
+
+ /**
+ * Get edges.
+ * @return Map of edges.
+ */
+ public Map<Utf8, Utf8> getEdges() {
+ return (Map<Utf8, Utf8>) get(2);
+ }
+
+ /**
+ * Gets value from edge.
+ * @param key Edge key.
+ * @return Utf8 containing the value of edge.
+ */
+ public Utf8 getFromEdges(Utf8 key) {
+ if (edges == null) { return null; }
+ return edges.get(key);
+ }
+
+ /**
+ * Puts a new edge.
+ * @param key of new edge.
+ * @param value of new edge.
+ */
+ public void putToEdges(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 2);
+ edges.put(key, value);
+ }
+
+ /**
+ * Remove from edges
+ * @param key of edge to be deleted.
+ * @return Utf8 containing value of deleted key.
+ */
+ public Utf8 removeFromEdges(Utf8 key) {
+ if (edges == null) { return null; }
+ getStateManager().setDirty(this, 2);
+ return edges.remove(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
new file mode 100644
index 0000000..2c1952d
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Vertex.
+ */
+@SuppressWarnings("all")
+public class GVertexResult extends PersistentBase {
+ /**
+ * Schema used for the class.
+ */
+ public static final Schema OBJ_SCHEMA = Schema.parse(
+ "{\"type\":\"record\",\"name\":\"Vertex\"," +
+ "\"namespace\":\"org.apache.giraph.gora.generated\"," +
+ "\"fields\":[{\"name\":\"vertexId\",\"type\":\"string\"}," +
+ "{\"name\":\"value\",\"type\":\"float\"},{\"name\":\"edges\"," +
+ "\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+
+ /**
+ * Field enum
+ */
+ public static enum Field {
+ /**
+ * VertexId
+ */
+ VERTEX_ID(0, "vertexId"),
+
+ /**
+ * Field value
+ */
+ VALUE(1, "value"),
+
+ /**
+ * Edges
+ */
+ EDGES(2, "edges");
+
+ /**
+ * Field index
+ */
+ private int index;
+
+ /**
+ * Field name
+ */
+ private String name;
+
+ /**
+ * Field constructor
+ * @param index of attribute
+ * @param name of attribute
+ */
+ Field(int index, String name) {
+ this.index = index;
+ this.name = name;
+ }
+
+ /**
+ * Gets index
+ * @return int of attribute.
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String toString() {
+ return name;
+ }
+ };
+
+ /**
+ * Array containing all fields/
+ */
+ private static final String[] ALL_FIELDS = {
+ "vertexId", "value", "edges"
+ };
+
+ static {
+ PersistentBase.registerFields(GVertexResult.class, ALL_FIELDS);
+ }
+
+ /**
+ * Vertex Id
+ */
+ private Utf8 vertexId;
+
+ /**
+ * Value
+ */
+ private float value;
+
+ /**
+ * Edges
+ */
+ private Map<Utf8, Utf8> edges;
+
+ /**
+ * Default constructor
+ */
+ public GVertexResult() {
+ this(new StateManagerImpl());
+ }
+
+ /**
+ * Constructor
+ * @param stateManager from which the object will be created.
+ */
+ public GVertexResult(StateManager stateManager) {
+ super(stateManager);
+ edges = new StatefulHashMap<Utf8, Utf8>();
+ }
+
+ /**
+ * Creates a new instance
+ * @param stateManager from which the object will be created.
+ * @return GVertex created
+ */
+ public GVertexResult newInstance(StateManager stateManager) {
+ return new GVertexResult(stateManager);
+ }
+
+ /**
+ * Gets the object schema
+ * @return Schema of the object.
+ */
+ public Schema getSchema() {
+ return OBJ_SCHEMA;
+ }
+
+ /**
+ * Gets field
+ * @param fieldIndex index of field to be used.
+ * @return Object from an index.
+ */
+ public Object get(int fieldIndex) {
+ switch (fieldIndex) {
+ case 0:
+ return vertexId;
+ case 1:
+ return value;
+ case 2:
+ return edges;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Puts a value into a field.
+ * @param fieldIndex index of field used.
+ * @param fieldValue value of field used.
+ */
+ @SuppressWarnings(value = "unchecked")
+ public void put(int fieldIndex, Object fieldValue) {
+ if (isFieldEqual(fieldIndex, fieldValue)) {
+ return;
+ }
+ getStateManager().setDirty(this, fieldIndex);
+ switch (fieldIndex) {
+ case 0:
+ vertexId = (Utf8) fieldValue; break;
+ case 1:
+ value = (Float) fieldValue; break;
+ case 2:
+ edges = (Map<Utf8, Utf8>) fieldValue; break;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets vertexId
+ * @return Utf8 vertexId
+ */
+ public Utf8 getVertexId() {
+ return (Utf8) get(0);
+ }
+
+ /**
+ * Sets vertexId
+ * @param value vertexId
+ */
+ public void setVertexId(Utf8 value) {
+ put(0, value);
+ }
+
+ /**
+ * Gets value
+ * @return String of value.
+ */
+ public float getValue() {
+ return (Float) get(1);
+ }
+
+ /**
+ * Sets value
+ * @param value .
+ */
+ public void setValue(float value) {
+ put(1, value);
+ }
+
+ /**
+ * Get edges.
+ * @return Map of edges.
+ */
+ public Map<Utf8, Utf8> getEdges() {
+ return (Map<Utf8, Utf8>) get(2);
+ }
+
+ /**
+ * Gets value from edge.
+ * @param key Edge key.
+ * @return Utf8 containing the value of edge.
+ */
+ public Utf8 getFromEdges(Utf8 key) {
+ if (edges == null) { return null; }
+ return edges.get(key);
+ }
+
+ /**
+ * Puts a new edge.
+ * @param key of new edge.
+ * @param value of new edge.
+ */
+ public void putToEdges(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 2);
+ edges.put(key, value);
+ }
+
+ /**
+ * Remove from edges
+ * @param key of edge to be deleted.
+ * @return Utf8 containing value of deleted key.
+ */
+ public Utf8 removeFromEdges(Utf8 key) {
+ if (edges == null) { return null; }
+ getStateManager().setDirty(this, 2);
+ return edges.remove(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
new file mode 100644
index 0000000..6c218d1
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Gora Input/Output for Giraph
+ */
+package org.apache.giraph.io.gora.generated;
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
new file mode 100644
index 0000000..3a9b488
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Gora Input/Output for Giraph
+ */
+package org.apache.giraph.io.gora;
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
new file mode 100644
index 0000000..8d814f5
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+/**
+ * Example class for defining a way to construct Gora keys.
+ * Uses strings as keys inside Gora.
+ */
+public class DefaultKeyFactory extends KeyFactory {
+
+ /**
+ * Builds a key from a string parameter.
+ * @param keyString the key object as a string.
+ * @return the key object.
+ */
+ @Override
+ public Object buildKey(String keyString) {
+ Object key = null;
+ if (getDataStore() == null) {
+ throw new RuntimeException(
+ "DataStore must be defined before using a key Builder.");
+ } else {
+ key = getDataStore().newKey();
+ // Do specific transformation
+ key = keyString;
+ }
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
new file mode 100644
index 0000000..e11f910
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gora.mapreduce.GoraInputSplit;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraRecordReader;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat to fetch the input from Gora data stores. The
+ * query to fetch the items from the datastore should be prepared and
+ * set via setQuery(Job, Query), before submitting the job.
+ *
+ * Hadoop jobs can be either configured through static
+ *<code>setInput()</code> methods, or from GoraMapper.
+ * @param <K> KeyClass.
+ * @param <T> PersistentClass.
+ */
+public class ExtraGoraInputFormat<K, T extends PersistentBase>
+ extends InputFormat<K, T> {
+
+ /**
+ * String used to map partitioned queries into configuration object.
+ */
+ public static final String QUERY_KEY = "gora.inputformat.query";
+
+ /**
+ * Data store to be used.
+ */
+ private DataStore<K, T> dataStore;
+
+ /**
+ * Query to be performed.
+ */
+ private Query<K, T> query;
+
+ /**
+ * @param split InputSplit to be used.
+ * @param context JobContext to be used.
+ * @return RecordReader record reader used inside Hadoop job.
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordReader<K, T> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+
+ PartitionQuery<K, T> partitionQuery = (PartitionQuery<K, T>)
+ ((GoraInputSplit) split).getQuery();
+
+ //setInputPath(partitionQuery, context);
+ return new GoraRecordReader<K, T>(partitionQuery, context);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
+ InterruptedException {
+ List<PartitionQuery<K, T>> queries =
+ getDataStore().getPartitions(getQuery());
+ List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
+
+ for (PartitionQuery<K, T> partQuery : queries) {
+ splits.add(new GoraInputSplit(context.getConfiguration(), partQuery));
+ }
+
+ return splits;
+ }
+
+ /**
+ * @return the dataStore
+ */
+ public DataStore<K, T> getDataStore() {
+ return dataStore;
+ }
+
+ /**
+ * @param datStore the dataStore to set
+ */
+ public void setDataStore(DataStore<K, T> datStore) {
+ this.dataStore = datStore;
+ }
+
+ /**
+ * @return the query
+ */
+ public Query<K, T> getQuery() {
+ return query;
+ }
+
+ /**
+ * @param query the query to set
+ */
+ public void setQuery(Query<K, T> query) {
+ this.query = query;
+ }
+
+ /**
+ * Sets the partitioned query inside the job object.
+ * @param conf Configuration used.
+ * @param query Query to be executed.
+ * @param <K> Key class
+ * @param <T> Persistent class
+ * @throws IOException Exception that be might thrown.
+ */
+ public static <K, T extends Persistent> void setQuery(Configuration conf,
+ Query<K, T> query) throws IOException {
+ IOUtils.storeToConf(query, conf, QUERY_KEY);
+ }
+
+ /**
+ * Gets the partitioned query from the conf object passed.
+ * @param conf Configuration object.
+ * @return Query<K, T> passed inside the configuration object
+ * @throws IOException Exception that might be thrown.
+ */
+ public Query<K, T> getQuery(Configuration conf) throws IOException {
+ return IOUtils.loadFromConf(conf, QUERY_KEY);
+ }
+
+ /**
+ * Sets the input parameters for the job
+ * @param job the job to set the properties for
+ * @param query the query to get the inputs from
+ * @param dataStore the datastore as the input
+ * @param reuseObjects whether to reuse objects in serialization
+ * @param <K> Key class
+ * @param <V> Persistent class
+ * @throws IOException
+ */
+ public static <K, V extends Persistent> void setInput(Job job,
+ Query<K, V> query, DataStore<K, V> dataStore, boolean reuseObjects)
+ throws IOException {
+
+ Configuration conf = job.getConfiguration();
+
+ GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
+
+ job.setInputFormatClass(ExtraGoraInputFormat.class);
+ ExtraGoraInputFormat.setQuery(job.getConfiguration(), query);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
new file mode 100644
index 0000000..c3fc268
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Class used to handle the creation and querying of data stores through Gora.
+ */
+public class GoraUtils {
+
+ /**
+ * Attribute handling the specific class to be created.
+ */
+ private static Class<? extends DataStore> DATASTORECLASS;
+
+ /**
+ * Attribute handling configuration for data stores.
+ */
+ private static Configuration CONF = new Configuration();
+
+ /**
+ * The default constructor is set to be private by default so that the
+ * class is not instantiated.
+ */
+ private GoraUtils() { /* private constructor */ }
+
+ /**
+ * Creates a generic data store using the data store class.
+ * set using the class property
+ * @param <K> key class
+ * @param <T> value class
+ * @param keyClass key class used
+ * @param persistentClass persistent class used
+ * @return created data store
+ * @throws GoraException exception threw
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, T extends Persistent> DataStore<K, T>
+ createDataStore(Class<K> keyClass, Class<T> persistentClass)
+ throws GoraException {
+ DataStoreFactory.createProps();
+ DataStore<K, T> dataStore =
+ DataStoreFactory.createDataStore((Class<? extends DataStore<K, T>>)
+ DATASTORECLASS,
+ keyClass, persistentClass,
+ getConf());
+
+ return dataStore;
+ }
+
+ /**
+ * Creates a specific data store specified by.
+ * @param <K> key class
+ * @param <T> value class
+ * @param dataStoreClass Defines the type of data store used.
+ * @param keyClass Handles the key class to be used.
+ * @param persistentClass Handles the persistent class to be used.
+ * @return DataStore created using parameters passed.
+ * @throws GoraException if an error occurs.
+ */
+ public static <K, T extends Persistent> DataStore<K, T>
+ createSpecificDataStore(Class<? extends DataStore> dataStoreClass,
+ Class<K> keyClass, Class<T> persistentClass) throws GoraException {
+ DATASTORECLASS = dataStoreClass;
+ return createDataStore(keyClass, persistentClass);
+ }
+
+ /**
+ * Performs a range query to Gora datastores
+ * @param <K> key class
+ * @param <T> value class
+ * @param pDataStore data store being used.
+ * @param pStartKey start key for the range query.
+ * @param pEndKey end key for the range query.
+ * @return Result containing all results for the query.
+ */
+ public static <K, T extends Persistent> Result<K, T>
+ getRequest(DataStore<K, T> pDataStore, K pStartKey, K pEndKey) {
+ Query<K, T> query = getQuery(pDataStore, pStartKey, pEndKey);
+ return getRequest(pDataStore, query);
+ }
+
+ /**
+ * Performs a query to Gora datastores
+ * @param pDataStore data store being used.
+ * @param query query executed over data stores.
+ * @param <K> key class
+ * @param <T> value class
+ * @return Result containing all results for the query.
+ */
+ public static <K, T extends Persistent> Result<K, T>
+ getRequest(DataStore<K, T> pDataStore, Query<K, T> query) {
+ return pDataStore.execute(query);
+ }
+
+ /**
+ * Performs a range query to Gora datastores
+ * @param <K> key class
+ * @param <T> value class
+ * @param pDataStore data store being used.
+ * @param pStartKey start key for the range query.
+ * @return Result containing all results for the query.
+ */
+ public static <K, T extends Persistent> Result<K, T>
+ getRequest(DataStore<K, T> pDataStore, K pStartKey) {
+ return getRequest(pDataStore, pStartKey, null);
+ }
+
+ /**
+ * Gets a query object to be used as a range query.
+ * @param pDataStore data store used.
+ * @param pStartKey range start key.
+ * @param pEndKey range end key.
+ * @param <K> key class
+ * @param <T> value class
+ * @return range query object.
+ */
+ public static <K, T extends Persistent> Query<K, T>
+ getQuery(DataStore<K, T> pDataStore, K pStartKey, K pEndKey) {
+ Query<K, T> query = pDataStore.newQuery();
+ query.setStartKey(pStartKey);
+ query.setEndKey(pEndKey);
+ return query;
+ }
+
+ /**
+ * Gets a query object to be used as a simple get.
+ * @param pDataStore data store used.
+ * @param pStartKey range start key.
+ * @param <K> key class
+ * @param <T> value class
+ * @return query object.
+ */
+ public static <K, T extends Persistent> Query<K, T>
+ getQuery(DataStore<K, T> pDataStore, K pStartKey) {
+ Query<K, T> query = pDataStore.newQuery();
+ query.setStartKey(pStartKey);
+ query.setEndKey(null);
+ return query;
+ }
+
+ /**
+ * Gets a query object to be used as a simple get.
+ * @param pDataStore data store used.
+ * @param <K> key class
+ * @param <T> value class
+ * @return query object.
+ */
+ public static <K, T extends Persistent> Query<K, T>
+ getQuery(DataStore<K, T> pDataStore) {
+ Query<K, T> query = pDataStore.newQuery();
+ query.setStartKey(null);
+ query.setEndKey(null);
+ return query;
+ }
+
+ /**
+ * Gets the configuration object.
+ * @return the configuration object.
+ */
+ public static Configuration getConf() {
+ return CONF;
+ }
+
+ /**
+ * Sets the configuration object.
+ * @param conf to be set as the configuration object.
+ */
+ public static void setConf(Configuration conf) {
+ CONF = conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
new file mode 100644
index 0000000..c8dd4a7
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import org.apache.gora.store.DataStore;
+
+/**
+ * Class used to convert strings into more complex keys.
+ */
+public abstract class KeyFactory {
+
+ /**
+ * Data store used for creating a new key.
+ */
+ private DataStore dataStore;
+
+ /**
+ * Builds a key from a string parameter.
+ * @param keyString the key object as a string.
+ * @return the key object.
+ */
+ public abstract Object buildKey(String keyString);
+
+ /**
+ * Gets the data store used in this factory.
+ * @return the dataStore
+ */
+ public DataStore getDataStore() {
+ return dataStore;
+ }
+
+ /**
+ * Sets the data store used in this factory.
+ * @param dataStore the dataStore to set
+ */
+ public void setDataStore(DataStore dataStore) {
+ this.dataStore = dataStore;
+ }
+}