You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/18 15:57:25 UTC

git commit: updated refs/heads/trunk to e987492

Updated Branches:
  refs/heads/trunk 9ded6c372 -> e987492ee


GIRAPH-759


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e987492e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e987492e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e987492e

Branch: refs/heads/trunk
Commit: e987492ee3c84e5b7d69af2ecbec2b07f6c4b6ae
Parents: 9ded6c3
Author: Claudio Martella <cl...@gmail.com>
Authored: Mon Nov 18 15:56:42 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Mon Nov 18 15:56:42 2013 +0100

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/io/gora/GoraEdgeInputFormat.java     | 409 +++++++++++++++++++
 .../io/gora/GoraGEdgeEdgeInputFormat.java       |  93 +++++
 .../apache/giraph/io/gora/generated/GEdge.java  | 314 ++++++++++++++
 .../giraph/io/gora/GoraTestEdgeInputFormat.java | 132 ++++++
 .../giraph/io/gora/TestGoraEdgeInputFormat.java | 122 ++++++
 6 files changed, 1072 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 5471120..c639ba1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-759: Create EdgeInputFormat from Apache Gora (renato2099 via claudio)
+
   GIRAPH-758: Create VertexOutputFormat to Apache Gora (renato2099 via claudio)
   
   GIRAPH-757: Create VertexInputFormat from Apache Gora (renato2099 via claudio)

http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
new file mode 100644
index 0000000..d0dcc32
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.giraph.io.gora.utils.KeyFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ *  Class which wraps the GoraInputFormat. It's designed
+ *  as an extension point to EdgeInputFormat subclasses who wish
+ *  to read from Gora data sources.
+ *
+ *  Works with
+ *  {@link GoraVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <E>  edge type
+ */
+public abstract class GoraEdgeInputFormat
+  <I extends WritableComparable, E extends Writable>
+  extends EdgeInputFormat<I, E> {
+
+  /** Start key for querying Gora data store. */
+  private static Object START_KEY;
+
+  /** End key for querying Gora data store. */
+  private static Object END_KEY;
+
+  /** Logger for Gora's vertex input format. */
+  private static final Logger LOG =
+          Logger.getLogger(GoraEdgeInputFormat.class);
+
+  /** KeyClass used for getting data. */
+  private static Class<?> KEY_CLASS;
+
+  /** The vertex itself will be used as a value inside Gora. */
+  private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+  /** Data store class to be used as backend. */
+  private static Class<? extends DataStore> DATASTORE_CLASS;
+
+  /** Class used to transform strings into Keys */
+  private static Class<?> KEY_FACTORY_CLASS;
+
+  /** Data store used for querying data. */
+  private static DataStore DATA_STORE;
+
+  /** counter for iinput records */
+  private static int RECORD_COUNTER = 0;
+
+  /** Delegate Gora input format */
+  private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
+         new ExtraGoraInputFormat();
+
+  /**
+   * @param conf configuration parameters
+   */
+  public void checkInputSpecs(Configuration conf) {
+    String sDataStoreType =
+        GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
+    String sKeyType =
+        GIRAPH_GORA_KEY_CLASS.get(getConf());
+    String sPersistentType =
+        GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
+    String sKeyFactoryClass =
+        GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
+    try {
+      Class<?> keyClass = Class.forName(sKeyType);
+      Class<?> persistentClass = Class.forName(sPersistentType);
+      Class<?> dataStoreClass = Class.forName(sDataStoreType);
+      Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
+      setKeyClass(keyClass);
+      setPersistentClass((Class<? extends Persistent>) persistentClass);
+      setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+      setKeyFactoryClass(keyFactoryClass);
+      setDataStore(createDataStore());
+      GORA_INPUT_FORMAT.setDataStore(getDataStore());
+    } catch (ClassNotFoundException e) {
+      LOG.error("Error while reading Gora Input parameters");
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Gets the splits for a data store.
+   * @param context JobContext
+   * @param minSplitCountHint Hint for a minimum split count
+   * @return List<InputSplit> A list of splits
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    KeyFactory kFact = null;
+    try {
+      kFact = (KeyFactory) getKeyFactoryClass().newInstance();
+    } catch (InstantiationException e) {
+      LOG.error("Key factory was not instantiated. Please verify.");
+      LOG.error(e.getMessage());
+      e.printStackTrace();
+    } catch (IllegalAccessException e) {
+      LOG.error("Key factory was not instantiated. Please verify.");
+      LOG.error(e.getMessage());
+      e.printStackTrace();
+    }
+    String sKey = GIRAPH_GORA_START_KEY.get(getConf());
+    String eKey = GIRAPH_GORA_END_KEY.get(getConf());
+    if (sKey == null || sKey.isEmpty()) {
+      LOG.warn("No start key has been defined.");
+      LOG.warn("Querying all the data store.");
+      sKey = null;
+      eKey = null;
+    }
+    kFact.setDataStore(getDataStore());
+    setStartKey(kFact.buildKey(sKey));
+    setEndKey(kFact.buildKey(eKey));
+    Query tmpQuery = GoraUtils.getQuery(
+        getDataStore(), getStartKey(), getEndKey());
+    GORA_INPUT_FORMAT.setQuery(tmpQuery);
+    List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
+    return splits;
+  }
+
+  @Override
+  public abstract GoraEdgeReader createEdgeReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex input. Easiest to ignore the key value separator and only use
+   * key instead.
+   */
+  protected abstract class GoraEdgeReader extends EdgeReader<I, E> {
+    /** current edge obtained from Rexster */
+    private Edge<I, E> edge;
+    /** Results gotten from Gora data store. */
+    private Result readResults;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      getResults();
+      RECORD_COUNTER = 0;
+    }
+
+    /**
+     * Gets the next edge from Gora data store.
+     * @return true/false depending on the existence of vertices.
+     * @throws IOException exceptions passed along.
+     * @throws InterruptedException exceptions passed along.
+     */
+    @Override
+    // CHECKSTYLE: stop IllegalCatch
+    public boolean nextEdge() throws IOException, InterruptedException {
+      boolean flg = false;
+      try {
+        flg = this.getReadResults().next();
+        this.edge = transformEdge(this.getReadResults().get());
+        RECORD_COUNTER++;
+      } catch (Exception e) {
+        LOG.debug("Error transforming vertices.");
+        flg = false;
+      }
+      LOG.debug(RECORD_COUNTER + " were transformed.");
+      return flg;
+    }
+    // CHECKSTYLE: resume IllegalCatch
+
+    /**
+     * Gets the progress of reading results from Gora.
+     * @return the progress of reading results from Gora.
+     */
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      float progress = 0.0f;
+      if (getReadResults() != null) {
+        progress = getReadResults().getProgress();
+      }
+      return progress;
+    }
+
+    /**
+     * Gets current edge.
+     *
+     * @return  The edge object represented by a Gora object
+     */
+    @Override
+    public Edge<I, E> getCurrentEdge()
+      throws IOException, InterruptedException {
+      return this.edge;
+    }
+
+    /**
+     * Parser for a single Gora object
+     *
+     * @param   goraObject vertex represented as a GoraObject
+     * @return  The edge object represented by a Gora object
+     */
+    protected abstract Edge<I, E> transformEdge(Object goraObject);
+
+    /**
+     * Performs a range query to a Gora data store.
+     */
+    protected void getResults() {
+      setReadResults(GoraUtils.getRequest(getDataStore(),
+          getStartKey(), getEndKey()));
+    }
+
+    /**
+     * Finishes the reading process.
+     * @throws IOException.
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Gets the results read.
+     * @return results read.
+     */
+    Result getReadResults() {
+      return readResults;
+    }
+
+    /**
+     * Sets the results read.
+     * @param readResults results read.
+     */
+    void setReadResults(Result readResults) {
+      this.readResults = readResults;
+    }
+  }
+
+  /**
+   * Gets the data store object initialized.
+   * @return DataStore created
+   */
+  public DataStore createDataStore() {
+    DataStore dsCreated = null;
+    try {
+      dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+          getKeyClass(), getPersistentClass());
+    } catch (GoraException e) {
+      LOG.error("Error creating data store.");
+      e.printStackTrace();
+    }
+    return dsCreated;
+  }
+
+  /**
+   * Gets the persistent Class
+   * @return persistentClass used
+   */
+  static Class<? extends Persistent> getPersistentClass() {
+    return PERSISTENT_CLASS;
+  }
+
+  /**
+   * Sets the persistent Class
+   * @param persistentClassUsed to be set
+   */
+  static void setPersistentClass
+  (Class<? extends Persistent> persistentClassUsed) {
+    PERSISTENT_CLASS = persistentClassUsed;
+  }
+
+  /**
+   * Gets the key class used.
+   * @return the key class used.
+   */
+  static Class<?> getKeyClass() {
+    return KEY_CLASS;
+  }
+
+  /**
+   * Sets the key class used.
+   * @param keyClassUsed key class used.
+   */
+  static void setKeyClass(Class<?> keyClassUsed) {
+    KEY_CLASS = keyClassUsed;
+  }
+
+  /**
+   * @return Class the DATASTORE_CLASS
+   */
+  public static Class<? extends DataStore> getDatastoreClass() {
+    return DATASTORE_CLASS;
+  }
+
+  /**
+   * @param dataStoreClass the dataStore class to set
+   */
+  public static void setDatastoreClass(
+      Class<? extends DataStore> dataStoreClass) {
+    DATASTORE_CLASS = dataStoreClass;
+  }
+
+  /**
+   * Gets the start key for querying.
+   * @return the start key.
+   */
+  public Object getStartKey() {
+    return START_KEY;
+  }
+
+  /**
+   * Gets the start key for querying.
+   * @param startKey start key.
+   */
+  public static void setStartKey(Object startKey) {
+    START_KEY = startKey;
+  }
+
+  /**
+   * Gets the end key for querying.
+   * @return the end key.
+   */
+  static Object getEndKey() {
+    return END_KEY;
+  }
+
+  /**
+   * Sets the end key for querying.
+   * @param pEndKey start key.
+   */
+  static void setEndKey(Object pEndKey) {
+    END_KEY = pEndKey;
+  }
+
+  /**
+   * Gets the key factory class.
+   * @return the kEY_FACTORY_CLASS
+   */
+  static Class<?> getKeyFactoryClass() {
+    return KEY_FACTORY_CLASS;
+  }
+
+  /**
+   * Sets the key factory class.
+   * @param keyFactoryClass the keyFactoryClass to set.
+   */
+  static void setKeyFactoryClass(Class<?> keyFactoryClass) {
+    KEY_FACTORY_CLASS = keyFactoryClass;
+  }
+
+  /**
+   * Gets the data store.
+   * @return DataStore
+   */
+  public static DataStore getDataStore() {
+    return DATA_STORE;
+  }
+
+  /**
+   * Sets the data store
+   * @param dStore the dATA_STORE to set
+   */
+  public static void setDataStore(DataStore dStore) {
+    DATA_STORE = dStore;
+  }
+
+  /**
+   * Returns a logger.
+   * @return the log for the output format.
+   */
+  public static Logger getLogger() {
+    return LOG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
new file mode 100644
index 0000000..e738f36
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Example implementation of a specific reader for a generated data bean.
+ */
+public class GoraGEdgeEdgeInputFormat
+  extends GoraEdgeInputFormat<LongWritable, FloatWritable> {
+
+  /**
+   * Default constructor
+   */
+  public GoraGEdgeEdgeInputFormat() {
+  }
+
+  /**
+   * Creates specific vertex reader to be used inside Hadoop.
+   * @param split split to be read.
+   * @param context JobContext to be used.
+   * @return GoraEdgeReader Edge reader to be used by Hadoop.
+   */
+  @Override
+  public GoraEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new GoraGEdgeEdgeReader();
+  }
+
+  /**
+   * Gora edge reader
+   */
+  protected class GoraGEdgeEdgeReader extends GoraEdgeReader {
+
+    /** source vertex of the edge */
+    private LongWritable sourceId;
+
+    /**
+     * Transforms a GoraObject into an Edge object.
+     * @param goraObject Object from Gora to be translated.
+     * @return Edge Result from transforming the gora object.
+     */
+    @Override
+    protected Edge<LongWritable, FloatWritable> transformEdge
+    (Object goraObject) {
+      Edge<LongWritable, FloatWritable> edge = null;
+      GEdge goraEdge = (GEdge) goraObject;
+      Long dest;
+      Long value;
+      dest = Long.valueOf(goraEdge.getVertexOutId().toString());
+      this.sourceId = new LongWritable();
+      this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString()));
+      value = (long) goraEdge.getEdgeWeight();
+      edge = EdgeFactory.create(new LongWritable(dest),
+          new FloatWritable(value));
+      return edge;
+    }
+
+    /**
+     * Gets the currentSourceId for the edge.
+     * @return LongWritable currentSourceId for the edge.
+     */
+    @Override
+    public LongWritable getCurrentSourceId() throws IOException,
+        InterruptedException {
+      return this.sourceId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
new file mode 100644
index 0000000..f6ac3f7
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Edge.
+ */
+@SuppressWarnings("all")
+public class GEdge extends PersistentBase {
+  /**
+   * Schema used for the class.
+   */
+  public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," +
+    "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," +
+    "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," +
+    "{\"name\":\"edgeWeight\",\"type\":\"float\"}," +
+    "{\"name\":\"vertexInId\",\"type\":\"string\"}," +
+    "{\"name\":\"vertexOutId\",\"type\":\"string\"}," +
+    "{\"name\":\"label\",\"type\":\"string\"}]}");
+
+  /**
+   * Field enum
+   */
+  public static enum Field {
+    /**
+     * Edge id.
+     */
+    EDGE_ID(0, "edgeId"),
+
+    /**
+     * Edge weight.
+     */
+    EDGE_WEIGHT(1, "edgeWeight"),
+
+    /**
+     * Edge vertex source id.
+     */
+    VERTEX_IN_ID(2, "vertexInId"),
+
+    /**
+     * Edge vertex end id.
+     */
+    VERTEX_OUT_ID(3, "vertexOutId"),
+
+    /**
+     * Edge label.
+     */
+    LABEL(4, "label");
+
+    /**
+     * Field index
+     */
+    private int index;
+
+    /**
+     * Field name
+     */
+    private String name;
+
+    /**
+     * Field constructor
+     * @param index of attribute
+     * @param name of attribute
+     */
+    Field(int index, String name) {
+      this.index = index;
+      this.name = name;
+    }
+
+    /**
+     * Gets index
+     * @return int of attribute.
+     */
+    public int getIndex() {
+      return index;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String toString() {
+      return name;
+    }
+  };
+
+  /**
+   * Array containing all fields/
+   */
+  private static final String[] ALL_FIELDS = {
+    "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label"
+  };
+
+  static {
+    PersistentBase.registerFields(GEdge.class, ALL_FIELDS);
+  }
+
+  /**
+   * edgeId
+   */
+  private Utf8 edgeId;
+
+  /**
+   * edgeWeight
+   */
+  private float edgeWeight;
+
+  /**
+   * vertexInId
+   */
+  private Utf8 vertexInId;
+
+  /**
+   * vertexOutId
+   */
+  private Utf8 vertexOutId;
+
+  /**
+   * label
+   */
+  private Utf8 label;
+
+  /**
+   * Default constructor.
+   */
+  public GEdge() {
+    this(new StateManagerImpl());
+  }
+
+  /**
+   * Constructor
+   * @param stateManager from which the object will be created.
+   */
+  public GEdge(StateManager stateManager) {
+    super(stateManager);
+  }
+
+  /**
+   * Creates a new instance
+   * @param stateManager from which the object will be created.
+   * @return GEdge created
+   */
+  public GEdge newInstance(StateManager stateManager) {
+    return new GEdge(stateManager);
+  }
+
+  /**
+   * Gets the object schema
+   * @return Schema of the object.
+   */
+  public Schema getSchema() {
+    return OBJ_SCHEMA;
+  }
+
+  /**
+   * Gets field
+   * @param fieldIndex index field.
+   * @return Object from an index.
+   */
+  public Object get(int fieldIndex) {
+    switch (fieldIndex) {
+    case 0:
+      return edgeId;
+    case 1:
+      return edgeWeight;
+    case 2:
+      return vertexInId;
+    case 3:
+      return vertexOutId;
+    case 4:
+      return label;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Puts a value into a field.
+   * @param fieldIndex index of field used.
+   * @param fieldValue value of field used.
+   */
+  @SuppressWarnings(value = "unchecked")
+  public void put(int fieldIndex, Object fieldValue) {
+    if (isFieldEqual(fieldIndex, fieldValue)) {
+      return;
+    }
+    getStateManager().setDirty(this, fieldIndex);
+    switch (fieldIndex) {
+    case 0:
+      edgeId = (Utf8) fieldValue; break;
+    case 1:
+      edgeWeight = (Float) fieldValue; break;
+    case 2:
+      vertexInId = (Utf8) fieldValue; break;
+    case 3:
+      vertexOutId = (Utf8) fieldValue; break;
+    case 4:
+      label = (Utf8) fieldValue; break;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets edgeId
+   * @return Utf8 edgeId
+   */
+  public Utf8 getEdgeId() {
+    return (Utf8) get(0);
+  }
+
+  /**
+   * Sets edgeId
+   * @param value edgeId
+   */
+  public void setEdgeId(Utf8 value) {
+    put(0, value);
+  }
+
+  /**
+   * Gets edgeWeight
+   * @return float edgeWeight
+   */
+  public float getEdgeWeight() {
+    return (Float) get(1);
+  }
+
+  /**
+   * Sets edgeWeight
+   * @param value edgeWeight
+   */
+  public void setEdgeWeight(float value) {
+    put(1, value);
+  }
+
+  /**
+   * Gets edgeVertexInId
+   * @return Utf8 edgeVertexInId
+   */
+  public Utf8 getVertexInId() {
+    return (Utf8) get(2);
+  }
+
+  /**
+   * Sets edgeVertexInId
+   * @param value edgeVertexInId
+   */
+  public void setVertexInId(Utf8 value) {
+    put(2, value);
+  }
+
+  /**
+   * Gets edgeVertexOutId
+   * @return Utf8 edgeVertexOutId
+   */
+  public Utf8 getVertexOutId() {
+    return (Utf8) get(3);
+  }
+
+  /**
+   * Sets edgeVertexOutId
+   * @param value edgeVertexOutId
+   */
+  public void setVertexOutId(Utf8 value) {
+    put(3, value);
+  }
+
+  /**
+   * Gets edgeLabel
+   * @return Utf8 edgeLabel
+   */
+  public Utf8 getLabel() {
+    return (Utf8) get(4);
+  }
+
+  /**
+   * Sets edgeLabel
+   * @param value edgeLabel
+   */
+  public void setLabel(Utf8 value) {
+    put(4, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
new file mode 100644
index 0000000..ba71ce4
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.gora.GoraEdgeInputFormat;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific reader for a generated data bean.
+ */
+public class GoraTestEdgeInputFormat
+  extends GoraEdgeInputFormat<LongWritable, FloatWritable> {
+
+  /**
+   * Default constructor
+   */
+  public GoraTestEdgeInputFormat() {
+  }
+
+  /**
+   * Creates specific vertex reader to be used inside Hadoop.
+   * @param split split to be read.
+   * @param context JobContext to be used.
+   * @return GoraEdgeReader Edge reader to be used by Hadoop.
+   */
+  @Override
+  public GoraEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    putArtificialData();
+    return new GoraGEdgeEdgeReader();
+  }
+
+  /**
+   * Writes data into the data store in order to test it out.
+   */
+  @SuppressWarnings("unchecked")
+  private static void putArtificialData() {
+    getDataStore().put("11-22",
+        createEdge("11-22", "11", "22", "11-22", (float)(11+22)));
+    getDataStore().put("22-11",
+        createEdge("22-11", "22", "11", "22-11", (float)(22+11)));
+    getDataStore().put("11-33",
+        createEdge("11-33", "11", "33", "11-33", (float)(11+33)));
+    getDataStore().put("33-11",
+        createEdge("33-11", "33", "11", "33-11", (float)(33+11)));
+    getDataStore().flush();
+  }
+
+  /**
+   * Creates an edge using an id and a set of edges.
+   * @param id Vertex id.
+   * @param vertexInId Vertex source Id.
+   * @param vertexOutId Vertex destination Id.
+   * @param edgeLabel Edge label.
+   * @param edgeWeight Edge wight.
+   * @return GEdge created.
+   */
+  private static GEdge createEdge(String id, String vertexInId,
+      String vertexOutId, String edgeLabel, float edgeWeight) {
+    GEdge newEdge = new GEdge();
+    newEdge.setEdgeId(new Utf8(id));
+    newEdge.setVertexInId(new Utf8(vertexInId));
+    newEdge.setVertexOutId(new Utf8(vertexOutId));
+    newEdge.setLabel(new Utf8(edgeLabel));
+    newEdge.setEdgeWeight(edgeWeight);
+    return newEdge;
+  }
+
+  /**
+   * Gora edge reader
+   */
+  protected class GoraGEdgeEdgeReader extends GoraEdgeReader {
+
+    /** source vertex of the edge */
+    private LongWritable sourceId;
+
+    /**
+     * Transforms a GoraObject into an Edge object.
+     * @param goraObject Object from Gora to be translated.
+     * @return Edge Result from transforming the gora object.
+     */
+    @Override
+    protected Edge<LongWritable, FloatWritable> transformEdge
+    (Object goraObject) {
+      Edge<LongWritable, FloatWritable> edge = null;
+      GEdge goraEdge = (GEdge) goraObject;
+      Long dest;
+      Long value;
+      dest = Long.valueOf(goraEdge.getVertexOutId().toString());
+      this.sourceId = new LongWritable();
+      this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString()));
+      value = (long) goraEdge.getEdgeWeight();
+      edge = EdgeFactory.create(new LongWritable(dest),
+          new FloatWritable(value));
+      return edge;
+    }
+
+    /**
+     * Gets the currentSourceId for the edge.
+     * @return LongWritable currentSourceId for the edge.
+     */
+    @Override
+    public LongWritable getCurrentSourceId() throws IOException,
+        InterruptedException {
+      return this.sourceId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
new file mode 100644
index 0000000..a01fbd3
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * Test class for Gora vertex input/output formats.
+ */
+public class TestGoraEdgeInputFormat {
+
+  @Test
+  public void getEmptyDb() throws Exception {
+    Iterable<String>    results;
+    Iterator<String>    result;
+    GiraphConfiguration conf    = new GiraphConfiguration();
+    GIRAPH_GORA_DATASTORE_CLASS.
+    set(conf, "org.apache.gora.memory.store.MemStore");
+    GIRAPH_GORA_KEYS_FACTORY_CLASS.
+    set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+    GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+    GIRAPH_GORA_PERSISTENT_CLASS.
+    set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+    GIRAPH_GORA_START_KEY.set(conf,"1");
+    GIRAPH_GORA_END_KEY.set(conf,"3");
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization," +
+        "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setEdgeInputFormatClass(GoraGEdgeEdgeInputFormat.class);
+    results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+    Assert.assertNotNull(results);
+    result = results.iterator();
+    Assert.assertFalse(result.hasNext());
+  }
+
+  @Test
+  public void getTestDb() throws Exception {
+    Iterable<String>    results;
+    GiraphConfiguration conf    = new GiraphConfiguration();
+    GIRAPH_GORA_DATASTORE_CLASS.
+    set(conf, "org.apache.gora.memory.store.MemStore");
+    GIRAPH_GORA_KEYS_FACTORY_CLASS.
+    set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+    GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+    GIRAPH_GORA_PERSISTENT_CLASS.
+    set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+    GIRAPH_GORA_START_KEY.set(conf,"1");
+    GIRAPH_GORA_END_KEY.set(conf,"4");
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization," +
+        "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+    Assert.assertNotNull(results);
+    Assert.assertEquals(3, ((ArrayList<?>)results).size());
+    if (results instanceof Collection<?>
+    & (((Collection<?>)results).size() == 2)) {
+      Assert.assertEquals("33\t0.0",
+          ((ArrayList<?>)results).get(0).toString());
+      Assert.assertEquals("22\t0.0",
+          ((ArrayList<?>)results).get(1).toString());
+      Assert.assertEquals("11\t0.0",
+          ((ArrayList<?>)results).get(2).toString());
+    }
+  }
+
+  /*
+  Test compute method that sends each edge a notification of its parents.
+  The test set only has a 1-1 parent-to-child ratio for this unit test.
+   */
+  public static class EmptyComputation
+    extends BasicComputation<LongWritable, DoubleWritable,
+    FloatWritable, LongWritable> {
+
+    @Override
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+        Iterable<LongWritable> messages) throws IOException {
+      Assert.assertNotNull(vertex);
+      vertex.voteToHalt();
+    }
+  }
+}