You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/18 15:57:25 UTC
git commit: updated refs/heads/trunk to e987492
Updated Branches:
refs/heads/trunk 9ded6c372 -> e987492ee
GIRAPH-759
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e987492e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e987492e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e987492e
Branch: refs/heads/trunk
Commit: e987492ee3c84e5b7d69af2ecbec2b07f6c4b6ae
Parents: 9ded6c3
Author: Claudio Martella <cl...@gmail.com>
Authored: Mon Nov 18 15:56:42 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Mon Nov 18 15:56:42 2013 +0100
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/io/gora/GoraEdgeInputFormat.java | 409 +++++++++++++++++++
.../io/gora/GoraGEdgeEdgeInputFormat.java | 93 +++++
.../apache/giraph/io/gora/generated/GEdge.java | 314 ++++++++++++++
.../giraph/io/gora/GoraTestEdgeInputFormat.java | 132 ++++++
.../giraph/io/gora/TestGoraEdgeInputFormat.java | 122 ++++++
6 files changed, 1072 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 5471120..c639ba1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-759: Create EdgeInputFormat from Apache Gora (renato2099 via claudio)
+
GIRAPH-758: Create VertexOutputFormat to Apache Gora (renato2099 via claudio)
GIRAPH-757: Create VertexInputFormat from Apache Gora (renato2099 via claudio)
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
new file mode 100644
index 0000000..d0dcc32
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.giraph.io.gora.utils.KeyFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Class which wraps the GoraInputFormat. It's designed
+ * as an extension point to EdgeInputFormat subclasses who wish
+ * to read from Gora data sources.
+ *
+ * Works with
+ * {@link GoraVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <E> edge type
+ */
+public abstract class GoraEdgeInputFormat
+ <I extends WritableComparable, E extends Writable>
+ extends EdgeInputFormat<I, E> {
+
+ /** Start key for querying Gora data store. */
+ private static Object START_KEY;
+
+ /** End key for querying Gora data store. */
+ private static Object END_KEY;
+
+ /** Logger for Gora's vertex input format. */
+ private static final Logger LOG =
+ Logger.getLogger(GoraEdgeInputFormat.class);
+
+ /** KeyClass used for getting data. */
+ private static Class<?> KEY_CLASS;
+
+ /** The vertex itself will be used as a value inside Gora. */
+ private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+ /** Data store class to be used as backend. */
+ private static Class<? extends DataStore> DATASTORE_CLASS;
+
+ /** Class used to transform strings into Keys */
+ private static Class<?> KEY_FACTORY_CLASS;
+
+ /** Data store used for querying data. */
+ private static DataStore DATA_STORE;
+
+ /** counter for iinput records */
+ private static int RECORD_COUNTER = 0;
+
+ /** Delegate Gora input format */
+ private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
+ new ExtraGoraInputFormat();
+
+ /**
+ * @param conf configuration parameters
+ */
+ public void checkInputSpecs(Configuration conf) {
+ String sDataStoreType =
+ GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
+ String sKeyType =
+ GIRAPH_GORA_KEY_CLASS.get(getConf());
+ String sPersistentType =
+ GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
+ String sKeyFactoryClass =
+ GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
+ try {
+ Class<?> keyClass = Class.forName(sKeyType);
+ Class<?> persistentClass = Class.forName(sPersistentType);
+ Class<?> dataStoreClass = Class.forName(sDataStoreType);
+ Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
+ setKeyClass(keyClass);
+ setPersistentClass((Class<? extends Persistent>) persistentClass);
+ setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+ setKeyFactoryClass(keyFactoryClass);
+ setDataStore(createDataStore());
+ GORA_INPUT_FORMAT.setDataStore(getDataStore());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error while reading Gora Input parameters");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Gets the splits for a data store.
+ * @param context JobContext
+ * @param minSplitCountHint Hint for a minimum split count
+ * @return List<InputSplit> A list of splits
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+ throws IOException, InterruptedException {
+ KeyFactory kFact = null;
+ try {
+ kFact = (KeyFactory) getKeyFactoryClass().newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Key factory was not instantiated. Please verify.");
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ LOG.error("Key factory was not instantiated. Please verify.");
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ }
+ String sKey = GIRAPH_GORA_START_KEY.get(getConf());
+ String eKey = GIRAPH_GORA_END_KEY.get(getConf());
+ if (sKey == null || sKey.isEmpty()) {
+ LOG.warn("No start key has been defined.");
+ LOG.warn("Querying all the data store.");
+ sKey = null;
+ eKey = null;
+ }
+ kFact.setDataStore(getDataStore());
+ setStartKey(kFact.buildKey(sKey));
+ setEndKey(kFact.buildKey(eKey));
+ Query tmpQuery = GoraUtils.getQuery(
+ getDataStore(), getStartKey(), getEndKey());
+ GORA_INPUT_FORMAT.setQuery(tmpQuery);
+ List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
+ return splits;
+ }
+
+ @Override
+ public abstract GoraEdgeReader createEdgeReader(InputSplit split,
+ TaskAttemptContext context) throws IOException;
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use
+ * key instead.
+ */
+ protected abstract class GoraEdgeReader extends EdgeReader<I, E> {
+ /** current edge obtained from Rexster */
+ private Edge<I, E> edge;
+ /** Results gotten from Gora data store. */
+ private Result readResults;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ getResults();
+ RECORD_COUNTER = 0;
+ }
+
+ /**
+ * Gets the next edge from Gora data store.
+ * @return true/false depending on the existence of vertices.
+ * @throws IOException exceptions passed along.
+ * @throws InterruptedException exceptions passed along.
+ */
+ @Override
+ // CHECKSTYLE: stop IllegalCatch
+ public boolean nextEdge() throws IOException, InterruptedException {
+ boolean flg = false;
+ try {
+ flg = this.getReadResults().next();
+ this.edge = transformEdge(this.getReadResults().get());
+ RECORD_COUNTER++;
+ } catch (Exception e) {
+ LOG.debug("Error transforming vertices.");
+ flg = false;
+ }
+ LOG.debug(RECORD_COUNTER + " were transformed.");
+ return flg;
+ }
+ // CHECKSTYLE: resume IllegalCatch
+
+ /**
+ * Gets the progress of reading results from Gora.
+ * @return the progress of reading results from Gora.
+ */
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = 0.0f;
+ if (getReadResults() != null) {
+ progress = getReadResults().getProgress();
+ }
+ return progress;
+ }
+
+ /**
+ * Gets current edge.
+ *
+ * @return The edge object represented by a Gora object
+ */
+ @Override
+ public Edge<I, E> getCurrentEdge()
+ throws IOException, InterruptedException {
+ return this.edge;
+ }
+
+ /**
+ * Parser for a single Gora object
+ *
+ * @param goraObject vertex represented as a GoraObject
+ * @return The edge object represented by a Gora object
+ */
+ protected abstract Edge<I, E> transformEdge(Object goraObject);
+
+ /**
+ * Performs a range query to a Gora data store.
+ */
+ protected void getResults() {
+ setReadResults(GoraUtils.getRequest(getDataStore(),
+ getStartKey(), getEndKey()));
+ }
+
+ /**
+ * Finishes the reading process.
+ * @throws IOException.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Gets the results read.
+ * @return results read.
+ */
+ Result getReadResults() {
+ return readResults;
+ }
+
+ /**
+ * Sets the results read.
+ * @param readResults results read.
+ */
+ void setReadResults(Result readResults) {
+ this.readResults = readResults;
+ }
+ }
+
+ /**
+ * Gets the data store object initialized.
+ * @return DataStore created
+ */
+ public DataStore createDataStore() {
+ DataStore dsCreated = null;
+ try {
+ dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+ getKeyClass(), getPersistentClass());
+ } catch (GoraException e) {
+ LOG.error("Error creating data store.");
+ e.printStackTrace();
+ }
+ return dsCreated;
+ }
+
+ /**
+ * Gets the persistent Class
+ * @return persistentClass used
+ */
+ static Class<? extends Persistent> getPersistentClass() {
+ return PERSISTENT_CLASS;
+ }
+
+ /**
+ * Sets the persistent Class
+ * @param persistentClassUsed to be set
+ */
+ static void setPersistentClass
+ (Class<? extends Persistent> persistentClassUsed) {
+ PERSISTENT_CLASS = persistentClassUsed;
+ }
+
+ /**
+ * Gets the key class used.
+ * @return the key class used.
+ */
+ static Class<?> getKeyClass() {
+ return KEY_CLASS;
+ }
+
+ /**
+ * Sets the key class used.
+ * @param keyClassUsed key class used.
+ */
+ static void setKeyClass(Class<?> keyClassUsed) {
+ KEY_CLASS = keyClassUsed;
+ }
+
+ /**
+ * @return Class the DATASTORE_CLASS
+ */
+ public static Class<? extends DataStore> getDatastoreClass() {
+ return DATASTORE_CLASS;
+ }
+
+ /**
+ * @param dataStoreClass the dataStore class to set
+ */
+ public static void setDatastoreClass(
+ Class<? extends DataStore> dataStoreClass) {
+ DATASTORE_CLASS = dataStoreClass;
+ }
+
+ /**
+ * Gets the start key for querying.
+ * @return the start key.
+ */
+ public Object getStartKey() {
+ return START_KEY;
+ }
+
+ /**
+ * Gets the start key for querying.
+ * @param startKey start key.
+ */
+ public static void setStartKey(Object startKey) {
+ START_KEY = startKey;
+ }
+
+ /**
+ * Gets the end key for querying.
+ * @return the end key.
+ */
+ static Object getEndKey() {
+ return END_KEY;
+ }
+
+ /**
+ * Sets the end key for querying.
+ * @param pEndKey start key.
+ */
+ static void setEndKey(Object pEndKey) {
+ END_KEY = pEndKey;
+ }
+
+ /**
+ * Gets the key factory class.
+ * @return the kEY_FACTORY_CLASS
+ */
+ static Class<?> getKeyFactoryClass() {
+ return KEY_FACTORY_CLASS;
+ }
+
+ /**
+ * Sets the key factory class.
+ * @param keyFactoryClass the keyFactoryClass to set.
+ */
+ static void setKeyFactoryClass(Class<?> keyFactoryClass) {
+ KEY_FACTORY_CLASS = keyFactoryClass;
+ }
+
+ /**
+ * Gets the data store.
+ * @return DataStore
+ */
+ public static DataStore getDataStore() {
+ return DATA_STORE;
+ }
+
+ /**
+ * Sets the data store
+ * @param dStore the dATA_STORE to set
+ */
+ public static void setDataStore(DataStore dStore) {
+ DATA_STORE = dStore;
+ }
+
+ /**
+ * Returns a logger.
+ * @return the log for the output format.
+ */
+ public static Logger getLogger() {
+ return LOG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
new file mode 100644
index 0000000..e738f36
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Example implementation of a specific reader for a generated data bean.
+ */
+public class GoraGEdgeEdgeInputFormat
+ extends GoraEdgeInputFormat<LongWritable, FloatWritable> {
+
+ /**
+ * Default constructor
+ */
+ public GoraGEdgeEdgeInputFormat() {
+ }
+
+ /**
+ * Creates specific vertex reader to be used inside Hadoop.
+ * @param split split to be read.
+ * @param context JobContext to be used.
+ * @return GoraEdgeReader Edge reader to be used by Hadoop.
+ */
+ @Override
+ public GoraEdgeReader createEdgeReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new GoraGEdgeEdgeReader();
+ }
+
+ /**
+ * Gora edge reader
+ */
+ protected class GoraGEdgeEdgeReader extends GoraEdgeReader {
+
+ /** source vertex of the edge */
+ private LongWritable sourceId;
+
+ /**
+ * Transforms a GoraObject into an Edge object.
+ * @param goraObject Object from Gora to be translated.
+ * @return Edge Result from transforming the gora object.
+ */
+ @Override
+ protected Edge<LongWritable, FloatWritable> transformEdge
+ (Object goraObject) {
+ Edge<LongWritable, FloatWritable> edge = null;
+ GEdge goraEdge = (GEdge) goraObject;
+ Long dest;
+ Long value;
+ dest = Long.valueOf(goraEdge.getVertexOutId().toString());
+ this.sourceId = new LongWritable();
+ this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString()));
+ value = (long) goraEdge.getEdgeWeight();
+ edge = EdgeFactory.create(new LongWritable(dest),
+ new FloatWritable(value));
+ return edge;
+ }
+
+ /**
+ * Gets the currentSourceId for the edge.
+ * @return LongWritable currentSourceId for the edge.
+ */
+ @Override
+ public LongWritable getCurrentSourceId() throws IOException,
+ InterruptedException {
+ return this.sourceId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
new file mode 100644
index 0000000..f6ac3f7
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Edge.
+ */
+@SuppressWarnings("all")
+public class GEdge extends PersistentBase {
+ /**
+ * Schema used for the class.
+ */
+ public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," +
+ "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," +
+ "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," +
+ "{\"name\":\"edgeWeight\",\"type\":\"float\"}," +
+ "{\"name\":\"vertexInId\",\"type\":\"string\"}," +
+ "{\"name\":\"vertexOutId\",\"type\":\"string\"}," +
+ "{\"name\":\"label\",\"type\":\"string\"}]}");
+
+ /**
+ * Field enum
+ */
+ public static enum Field {
+ /**
+ * Edge id.
+ */
+ EDGE_ID(0, "edgeId"),
+
+ /**
+ * Edge weight.
+ */
+ EDGE_WEIGHT(1, "edgeWeight"),
+
+ /**
+ * Edge vertex source id.
+ */
+ VERTEX_IN_ID(2, "vertexInId"),
+
+ /**
+ * Edge vertex end id.
+ */
+ VERTEX_OUT_ID(3, "vertexOutId"),
+
+ /**
+ * Edge label.
+ */
+ LABEL(4, "label");
+
+ /**
+ * Field index
+ */
+ private int index;
+
+ /**
+ * Field name
+ */
+ private String name;
+
+ /**
+ * Field constructor
+ * @param index of attribute
+ * @param name of attribute
+ */
+ Field(int index, String name) {
+ this.index = index;
+ this.name = name;
+ }
+
+ /**
+ * Gets index
+ * @return int of attribute.
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets name
+ * @return String of name.
+ */
+ public String toString() {
+ return name;
+ }
+ };
+
+ /**
+ * Array containing all fields/
+ */
+ private static final String[] ALL_FIELDS = {
+ "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label"
+ };
+
+ static {
+ PersistentBase.registerFields(GEdge.class, ALL_FIELDS);
+ }
+
+ /**
+ * edgeId
+ */
+ private Utf8 edgeId;
+
+ /**
+ * edgeWeight
+ */
+ private float edgeWeight;
+
+ /**
+ * vertexInId
+ */
+ private Utf8 vertexInId;
+
+ /**
+ * vertexOutId
+ */
+ private Utf8 vertexOutId;
+
+ /**
+ * label
+ */
+ private Utf8 label;
+
+ /**
+ * Default constructor.
+ */
+ public GEdge() {
+ this(new StateManagerImpl());
+ }
+
+ /**
+ * Constructor
+ * @param stateManager from which the object will be created.
+ */
+ public GEdge(StateManager stateManager) {
+ super(stateManager);
+ }
+
+ /**
+ * Creates a new instance
+ * @param stateManager from which the object will be created.
+ * @return GEdge created
+ */
+ public GEdge newInstance(StateManager stateManager) {
+ return new GEdge(stateManager);
+ }
+
+ /**
+ * Gets the object schema
+ * @return Schema of the object.
+ */
+ public Schema getSchema() {
+ return OBJ_SCHEMA;
+ }
+
+ /**
+ * Gets field
+ * @param fieldIndex index field.
+ * @return Object from an index.
+ */
+ public Object get(int fieldIndex) {
+ switch (fieldIndex) {
+ case 0:
+ return edgeId;
+ case 1:
+ return edgeWeight;
+ case 2:
+ return vertexInId;
+ case 3:
+ return vertexOutId;
+ case 4:
+ return label;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Puts a value into a field.
+ * @param fieldIndex index of field used.
+ * @param fieldValue value of field used.
+ */
+ @SuppressWarnings(value = "unchecked")
+ public void put(int fieldIndex, Object fieldValue) {
+ if (isFieldEqual(fieldIndex, fieldValue)) {
+ return;
+ }
+ getStateManager().setDirty(this, fieldIndex);
+ switch (fieldIndex) {
+ case 0:
+ edgeId = (Utf8) fieldValue; break;
+ case 1:
+ edgeWeight = (Float) fieldValue; break;
+ case 2:
+ vertexInId = (Utf8) fieldValue; break;
+ case 3:
+ vertexOutId = (Utf8) fieldValue; break;
+ case 4:
+ label = (Utf8) fieldValue; break;
+ default:
+ throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets edgeId
+ * @return Utf8 edgeId
+ */
+ public Utf8 getEdgeId() {
+ return (Utf8) get(0);
+ }
+
+ /**
+ * Sets edgeId
+ * @param value edgeId
+ */
+ public void setEdgeId(Utf8 value) {
+ put(0, value);
+ }
+
+ /**
+ * Gets edgeWeight
+ * @return float edgeWeight
+ */
+ public float getEdgeWeight() {
+ return (Float) get(1);
+ }
+
+ /**
+ * Sets edgeWeight
+ * @param value edgeWeight
+ */
+ public void setEdgeWeight(float value) {
+ put(1, value);
+ }
+
+ /**
+ * Gets edgeVertexInId
+ * @return Utf8 edgeVertexInId
+ */
+ public Utf8 getVertexInId() {
+ return (Utf8) get(2);
+ }
+
+ /**
+ * Sets edgeVertexInId
+ * @param value edgeVertexInId
+ */
+ public void setVertexInId(Utf8 value) {
+ put(2, value);
+ }
+
+ /**
+ * Gets edgeVertexOutId
+ * @return Utf8 edgeVertexOutId
+ */
+ public Utf8 getVertexOutId() {
+ return (Utf8) get(3);
+ }
+
+ /**
+ * Sets edgeVertexOutId
+ * @param value edgeVertexOutId
+ */
+ public void setVertexOutId(Utf8 value) {
+ put(3, value);
+ }
+
+ /**
+ * Gets edgeLabel
+ * @return Utf8 edgeLabel
+ */
+ public Utf8 getLabel() {
+ return (Utf8) get(4);
+ }
+
+ /**
+ * Sets edgeLabel
+ * @param value edgeLabel
+ */
+ public void setLabel(Utf8 value) {
+ put(4, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
new file mode 100644
index 0000000..ba71ce4
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.gora.GoraEdgeInputFormat;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific reader for a generated data bean.
+ */
+public class GoraTestEdgeInputFormat
+ extends GoraEdgeInputFormat<LongWritable, FloatWritable> {
+
+ /**
+ * Default constructor
+ */
+ public GoraTestEdgeInputFormat() {
+ }
+
+ /**
+ * Creates specific vertex reader to be used inside Hadoop.
+ * @param split split to be read.
+ * @param context JobContext to be used.
+ * @return GoraEdgeReader Edge reader to be used by Hadoop.
+ */
+ @Override
+ public GoraEdgeReader createEdgeReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ putArtificialData();
+ return new GoraGEdgeEdgeReader();
+ }
+
+ /**
+ * Writes data into the data store in order to test it out.
+ */
+ @SuppressWarnings("unchecked")
+ private static void putArtificialData() {
+ getDataStore().put("11-22",
+ createEdge("11-22", "11", "22", "11-22", (float)(11+22)));
+ getDataStore().put("22-11",
+ createEdge("22-11", "22", "11", "22-11", (float)(22+11)));
+ getDataStore().put("11-33",
+ createEdge("11-33", "11", "33", "11-33", (float)(11+33)));
+ getDataStore().put("33-11",
+ createEdge("33-11", "33", "11", "33-11", (float)(33+11)));
+ getDataStore().flush();
+ }
+
+ /**
+ * Creates an edge using an id and a set of edges.
+ * @param id Vertex id.
+ * @param vertexInId Vertex source Id.
+ * @param vertexOutId Vertex destination Id.
+ * @param edgeLabel Edge label.
+ * @param edgeWeight Edge wight.
+ * @return GEdge created.
+ */
+ private static GEdge createEdge(String id, String vertexInId,
+ String vertexOutId, String edgeLabel, float edgeWeight) {
+ GEdge newEdge = new GEdge();
+ newEdge.setEdgeId(new Utf8(id));
+ newEdge.setVertexInId(new Utf8(vertexInId));
+ newEdge.setVertexOutId(new Utf8(vertexOutId));
+ newEdge.setLabel(new Utf8(edgeLabel));
+ newEdge.setEdgeWeight(edgeWeight);
+ return newEdge;
+ }
+
+ /**
+ * Gora edge reader
+ */
+ protected class GoraGEdgeEdgeReader extends GoraEdgeReader {
+
+ /** source vertex of the edge */
+ private LongWritable sourceId;
+
+ /**
+ * Transforms a GoraObject into an Edge object.
+ * @param goraObject Object from Gora to be translated.
+ * @return Edge Result from transforming the gora object.
+ */
+ @Override
+ protected Edge<LongWritable, FloatWritable> transformEdge
+ (Object goraObject) {
+ Edge<LongWritable, FloatWritable> edge = null;
+ GEdge goraEdge = (GEdge) goraObject;
+ Long dest;
+ Long value;
+ dest = Long.valueOf(goraEdge.getVertexOutId().toString());
+ this.sourceId = new LongWritable();
+ this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString()));
+ value = (long) goraEdge.getEdgeWeight();
+ edge = EdgeFactory.create(new LongWritable(dest),
+ new FloatWritable(value));
+ return edge;
+ }
+
+ /**
+ * Gets the currentSourceId for the edge.
+ * @return LongWritable currentSourceId for the edge.
+ */
+ @Override
+ public LongWritable getCurrentSourceId() throws IOException,
+ InterruptedException {
+ return this.sourceId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
new file mode 100644
index 0000000..a01fbd3
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * Test class for Gora vertex input/output formats.
+ */
+public class TestGoraEdgeInputFormat {
+
+ @Test
+ public void getEmptyDb() throws Exception {
+ Iterable<String> results;
+ Iterator<String> result;
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GIRAPH_GORA_DATASTORE_CLASS.
+ set(conf, "org.apache.gora.memory.store.MemStore");
+ GIRAPH_GORA_KEYS_FACTORY_CLASS.
+ set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+ GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+ GIRAPH_GORA_PERSISTENT_CLASS.
+ set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+ GIRAPH_GORA_START_KEY.set(conf,"1");
+ GIRAPH_GORA_END_KEY.set(conf,"3");
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.WritableSerialization," +
+ "org.apache.hadoop.io.serializer.JavaSerialization");
+ conf.setComputationClass(EmptyComputation.class);
+ conf.setEdgeInputFormatClass(GoraGEdgeEdgeInputFormat.class);
+ results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+ Assert.assertNotNull(results);
+ result = results.iterator();
+ Assert.assertFalse(result.hasNext());
+ }
+
+ @Test
+ public void getTestDb() throws Exception {
+ Iterable<String> results;
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GIRAPH_GORA_DATASTORE_CLASS.
+ set(conf, "org.apache.gora.memory.store.MemStore");
+ GIRAPH_GORA_KEYS_FACTORY_CLASS.
+ set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+ GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+ GIRAPH_GORA_PERSISTENT_CLASS.
+ set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+ GIRAPH_GORA_START_KEY.set(conf,"1");
+ GIRAPH_GORA_END_KEY.set(conf,"4");
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.WritableSerialization," +
+ "org.apache.hadoop.io.serializer.JavaSerialization");
+ conf.setComputationClass(EmptyComputation.class);
+ conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+ Assert.assertNotNull(results);
+ Assert.assertEquals(3, ((ArrayList<?>)results).size());
+ if (results instanceof Collection<?>
+ & (((Collection<?>)results).size() == 2)) {
+ Assert.assertEquals("33\t0.0",
+ ((ArrayList<?>)results).get(0).toString());
+ Assert.assertEquals("22\t0.0",
+ ((ArrayList<?>)results).get(1).toString());
+ Assert.assertEquals("11\t0.0",
+ ((ArrayList<?>)results).get(2).toString());
+ }
+ }
+
+ /*
+ Test compute method that sends each edge a notification of its parents.
+ The test set only has a 1-1 parent-to-child ratio for this unit test.
+ */
+ public static class EmptyComputation
+ extends BasicComputation<LongWritable, DoubleWritable,
+ FloatWritable, LongWritable> {
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<LongWritable> messages) throws IOException {
+ Assert.assertNotNull(vertex);
+ vertex.voteToHalt();
+ }
+ }
+}