You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/28 01:27:07 UTC

git commit: GIRAPH-588: GIRAPH-588: More flexible Hive input (majakabiljo)

Updated Branches:
  refs/heads/trunk 01c527e22 -> 95e122676


GIRAPH-588: GIRAPH-588: More flexible Hive input (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/95e12267
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/95e12267
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/95e12267

Branch: refs/heads/trunk
Commit: 95e1226766fb7df16e699cd54cf8ba0d043dc69d
Parents: 01c527e
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Mar 27 17:09:02 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Mar 27 17:26:24 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/io/iterables/EdgeReaderWrapper.java     |   83 ++++++++++++
 .../apache/giraph/io/iterables/EdgeWithSource.java |   96 ++++++++++++++
 .../apache/giraph/io/iterables/GiraphReader.java   |   59 +++++++++
 .../io/iterables/IteratorToReaderWrapper.java      |   68 ++++++++++
 .../giraph/io/iterables/VertexReaderWrapper.java   |   82 ++++++++++++
 .../apache/giraph/io/iterables/package-info.java   |   21 +++
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |   76 +++---------
 .../DefaultConfigurableAndTableSchemaAware.java    |   52 ++++++++
 .../giraph/hive/input/RecordReaderWrapper.java     |   61 +++++++++
 .../giraph/hive/input/edge/AbstractHiveToEdge.java |   13 +-
 .../hive/input/edge/HiveEdgeInputFormat.java       |    3 +-
 .../giraph/hive/input/edge/HiveEdgeReader.java     |   61 +++------
 .../apache/giraph/hive/input/edge/HiveToEdge.java  |   35 ++---
 .../giraph/hive/input/edge/SimpleHiveToEdge.java   |   89 +++++++++++++
 .../hive/input/vertex/AbstractHiveToVertex.java    |   41 ++++++
 .../input/vertex/AbstractHiveToVertexEdges.java    |   49 -------
 .../input/vertex/AbstractHiveToVertexValue.java    |   49 -------
 .../giraph/hive/input/vertex/HiveToVertex.java     |   50 +++++++
 .../hive/input/vertex/HiveToVertexEdges.java       |   68 ----------
 .../hive/input/vertex/HiveToVertexValue.java       |   49 -------
 .../hive/input/vertex/HiveVertexInputFormat.java   |    5 +-
 .../giraph/hive/input/vertex/HiveVertexReader.java |  102 ++++----------
 .../hive/input/vertex/SimpleHiveToVertex.java      |   93 +++++++++++++
 .../input/vertex/SimpleNoEdgesHiveToVertex.java    |   44 ++++++
 .../giraph/hive/output/AbstractVertexToHive.java   |   20 +---
 26 files changed, 934 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 051c9a7..ff20214 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-588: More flexible Hive input (majakabiljo)
+
   GIRAPH-587: Refactor configuration options (nitay)
 
   GIRAPH-581: More flexible Hive output (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
new file mode 100644
index 0000000..4af221a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for edges into {@link EdgeReader}
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeReaderWrapper<I extends WritableComparable,
+    E extends Writable> implements EdgeReader<I, E> {
+  /** Wrapped edge reader */
+  private GiraphReader<EdgeWithSource<I, E>> edgeReader;
+  /** {@link EdgeReader}-like wrapper of {@link #edgeReader} */
+  private IteratorToReaderWrapper<EdgeWithSource<I, E>> iterator;
+
+  /**
+   * Constructor
+   *
+   * @param edgeReader GiraphReader for edges to wrap
+   */
+  public EdgeReaderWrapper(GiraphReader<EdgeWithSource<I, E>> edgeReader) {
+    this.edgeReader = edgeReader;
+    iterator = new IteratorToReaderWrapper<EdgeWithSource<I, E>>(edgeReader);
+  }
+
+  @Override
+  public boolean nextEdge() throws IOException, InterruptedException {
+    return iterator.nextObject();
+  }
+
+  @Override
+  public I getCurrentSourceId() throws IOException, InterruptedException {
+    return iterator.getCurrentObject().getSourceVertexId();
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+    return iterator.getCurrentObject().getEdge();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    edgeReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    edgeReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return edgeReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
new file mode 100644
index 0000000..6f72de3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Wrapper for edge and its source id
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeWithSource<I extends WritableComparable,
+    E extends Writable> {
+  /** Source id */
+  private I sourceVertexId;
+  /** Edge */
+  private ReusableEdge<I, E> edge;
+
+  /**
+   * Constructor
+   */
+  public EdgeWithSource() {
+  }
+
+  /**
+   * Constructor with source id and edge
+   *
+   * @param sourceVertexId Source id
+   * @param edge Edge
+   */
+  public EdgeWithSource(I sourceVertexId, ReusableEdge<I, E> edge) {
+    this.sourceVertexId = sourceVertexId;
+    this.edge = edge;
+  }
+
+  public I getSourceVertexId() {
+    return sourceVertexId;
+  }
+
+  public void setSourceVertexId(I sourceVertexId) {
+    this.sourceVertexId = sourceVertexId;
+  }
+
+  public ReusableEdge<I, E> getEdge() {
+    return edge;
+  }
+
+  public void setEdge(ReusableEdge<I, E> edge) {
+    this.edge = edge;
+  }
+
+  public I getTargetVertexId() {
+    return edge.getTargetVertexId();
+  }
+
+  /**
+   * Set target vertex id of this edge
+   *
+   * @param targetVertexId Target vertex id
+   */
+  public void setTargetVertexId(I targetVertexId) {
+    edge.setTargetVertexId(targetVertexId);
+  }
+
+  public E getEdgeValue() {
+    return edge.getValue();
+  }
+
+  /**
+   * Set the value of this edge
+   *
+   * @param value Edge value
+   */
+  public void setEdgeValue(E value) {
+    edge.setValue(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
new file mode 100644
index 0000000..736ee9f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Reader for some kind of data.
+ *
+ * @param <T> Type of data which we are reading (can be vertex, edges, etc)
+ */
+public interface GiraphReader<T> extends Iterator<T> {
+  /**
+   * Use the input split and context to setup reading.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading.
+   * @param context Context from the task.
+   */
+  void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+      IOException, InterruptedException;
+
+  /**
+   * Close this {@link GiraphReader} to future operations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link GiraphReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  float getProgress() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
new file mode 100644
index 0000000..409bb99
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import java.util.Iterator;
+
+/**
+ * Wraps {@link Iterator} into object which provides iteration like in
+ * {@link org.apache.giraph.io.VertexReader} or
+ * {@link org.apache.giraph.io.EdgeReader}
+ *
+ * @param <T>
+ */
+public class IteratorToReaderWrapper<T> {
+  /** Wrapped iterator */
+  private Iterator<T> iterator;
+  /** Current object */
+  private T currentObject = null;
+
+  /**
+   * Constructor
+   *
+   * @param iterator Iterator to wrap
+   */
+  public IteratorToReaderWrapper(Iterator<T> iterator) {
+    this.iterator = iterator;
+  }
+
+  /**
+   * Read next object
+   *
+   * @return False iff there are no more objects
+   */
+  public boolean nextObject() {
+    boolean hasNext = iterator.hasNext();
+    if (hasNext) {
+      currentObject = iterator.next();
+    } else {
+      currentObject = null;
+    }
+    return hasNext;
+  }
+
+  /**
+   * Get the current object
+   *
+   * @return Current object
+   */
+  public T getCurrentObject() {
+    return currentObject;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
new file mode 100644
index 0000000..7493942
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for vertices into {@link VertexReader}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class VertexReaderWrapper<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    VertexReader<I, V, E, M> {
+  /** Wrapped edge reader */
+  private GiraphReader<Vertex<I, V, E, M>> vertexReader;
+  /** {@link VertexReader}-like wrapper of {@link #vertexReader} */
+  private IteratorToReaderWrapper<Vertex<I, V, E, M>> iterator;
+
+  /**
+   * Constructor
+   *
+   * @param vertexReader GiraphReader for vertices to wrap
+   */
+  public VertexReaderWrapper(GiraphReader<Vertex<I, V, E, M>> vertexReader) {
+    this.vertexReader = vertexReader;
+    iterator = new IteratorToReaderWrapper<Vertex<I, V, E, M>>(vertexReader);
+  }
+
+  @Override
+  public boolean nextVertex() throws IOException, InterruptedException {
+    return iterator.nextObject();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+      InterruptedException {
+    return iterator.getCurrentObject();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    vertexReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    vertexReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return vertexReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
new file mode 100644
index 0000000..187cc4e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Iterable wrappers for IO readers/writers
+ */
+package org.apache.giraph.io.iterables;

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index efc08d3..0039dd6 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -30,8 +30,7 @@ import org.apache.giraph.hive.common.HiveProfiles;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
 import org.apache.giraph.hive.input.edge.HiveEdgeReader;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
-import org.apache.giraph.hive.input.vertex.HiveToVertexValue;
-import org.apache.giraph.hive.input.vertex.HiveToVertexEdges;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.input.vertex.HiveVertexReader;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
@@ -78,9 +77,7 @@ public class HiveGiraphRunner implements Tool {
   private Class<? extends Vertex> vertexClass;
 
   /** Vertex creator from hive records. */
-  private Class<? extends HiveToVertexValue> hiveToVertexClass;
-  /** Vertex edges creator from hive records. */
-  private Class<? extends HiveToVertexEdges> hiveToVertexEdgesClass;
+  private Class<? extends HiveToVertex> hiveToVertexClass;
   /** hive vertex input information */
   private  final HiveInputDescription hiveVertexInputDescription;
 
@@ -127,56 +124,31 @@ public class HiveGiraphRunner implements Tool {
     return hiveEdgeInputDescription;
   }
 
-  public Class<? extends HiveToVertexValue> getHiveToVertexValueClass() {
+  public Class<? extends HiveToVertex> getHiveToVertexClass() {
     return hiveToVertexClass;
   }
 
   /**
-   * Set HiveVertexCreator used with HiveVertexInputFormat
+   * Set HiveToVertex used with HiveVertexInputFormat
    *
-   * @param hiveToVertexClass HiveVertexCreator
+   * @param hiveToVertexClass HiveToVertex
    */
-  public void setHiveToVertexValueClass(
-      Class<? extends HiveToVertexValue> hiveToVertexClass) {
+  public void setHiveToVertexClass(
+      Class<? extends HiveToVertex> hiveToVertexClass) {
     this.hiveToVertexClass = hiveToVertexClass;
     conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass,
-        HiveToVertexValue.class);
+        HiveToVertex.class);
   }
 
   /**
    * Whether to use vertex input.
    *
-   * @return true if vertex input enabled (HiveVertexCreator is set).
+   * @return true if vertex input enabled (HiveToVertex is set).
    */
   public boolean hasVertexValueInput() {
     return hiveToVertexClass != null;
   }
 
-  public Class<? extends HiveToVertexEdges> getHiveToVertexEdgesClass() {
-    return hiveToVertexEdgesClass;
-  }
-
-  /**
-   * Whether we have a class for reading per-vertex edges from Hive.
-   *
-   * @return true if user set class for reading vertex edges.
-   */
-  public boolean hasHiveToVertexEdgesClass() {
-    return hiveToVertexEdgesClass != null;
-  }
-
-  /**
-   * Set class for reading per-vertex edges from hive.
-   *
-   * @param klass Class to use
-   */
-  public void setHiveToVertexEdgesClass(
-      Class<? extends HiveToVertexEdges> klass) {
-    this.hiveToVertexEdgesClass = klass;
-    conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_EDGES_KEY, klass,
-        HiveToVertexEdges.class);
-  }
-
   public Class<? extends HiveToEdge> getHiveToEdgeClass() {
     return hiveToEdgeClass;
   }
@@ -184,16 +156,16 @@ public class HiveGiraphRunner implements Tool {
   /**
    * Whether to use edge input.
    *
-   * @return true if edge input enabled (HiveEdgeCreator is set).
+   * @return true if edge input enabled (HiveToEdge is set).
    */
   public boolean hasEdgeInput() {
     return hiveToEdgeClass != null;
   }
 
   /**
-   * Set HiveEdgeCreator used with HiveEdgeInputFormat
+   * Set HiveToEdge used with HiveEdgeInputFormat
    *
-   * @param hiveToEdgeClass HiveEdgeCreator
+   * @param hiveToEdgeClass HiveToEdge
    */
   public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
     this.hiveToEdgeClass = hiveToEdgeClass;
@@ -364,15 +336,8 @@ public class HiveGiraphRunner implements Tool {
 
     String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
     if (hiveToVertexClassStr != null) {
-      setHiveToVertexValueClass(findClass(hiveToVertexClassStr,
-          HiveToVertexValue.class));
-    }
-
-    String hiveToVertexEdgesClassStr =
-        cmdln.getOptionValue("hiveToVertexEdgesClass");
-    if (hiveToVertexEdgesClassStr != null) {
-      setHiveToVertexEdgesClass(findClass(hiveToVertexEdgesClassStr,
-          HiveToVertexEdges.class));
+      setHiveToVertexClass(findClass(hiveToVertexClassStr,
+          HiveToVertex.class));
     }
 
     String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
@@ -392,7 +357,7 @@ public class HiveGiraphRunner implements Tool {
     if (hiveToVertexClass == null && hiveToEdgeClass == null) {
       throw new IllegalArgumentException(
           "Need at least one of Giraph " +
-          HiveToVertexValue.class.getSimpleName() +
+          HiveToVertex.class.getSimpleName() +
           " class name (-hiveToVertexClass) and " +
           HiveToEdge.class.getSimpleName() +
           " class name (-hiveToEdgeClass)");
@@ -522,14 +487,9 @@ public class HiveGiraphRunner implements Tool {
     // Vertex input settings
     if (hiveToVertexClass == null) {
       options.addOption(null, "hiveToVertexClass", true,
-          "Giraph " + HiveToVertexValue.class.getSimpleName() +
+          "Giraph " + HiveToVertex.class.getSimpleName() +
           " class to use");
     }
-    if (hiveToVertexEdgesClass == null) {
-      options.addOption(null, "hiveToVertexEdgesClass", true,
-          "Giraph " + HiveToVertexEdges.class.getSimpleName() +
-              " class to use");
-    }
     options.addOption("vi", "vertexInputTable", true,
         "Vertex input table name");
     options.addOption("VI", "vertexInputFilter", true,
@@ -654,10 +614,6 @@ public class HiveGiraphRunner implements Tool {
       LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
           hiveToVertexClass.getCanonicalName());
     }
-    if (hiveToVertexEdgesClass != null) {
-      LOG.info(LOG_PREFIX + "-hiveToVertexEdgesClass=" +
-          hiveToVertexEdgesClass.getCanonicalName());
-    }
     if (classes.getVertexInputFormatClass() != null) {
       LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
           classes.getVertexInputFormatClass().getCanonicalName());

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
new file mode 100644
index 0000000..c8b201f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * Default implementation of {@link HiveTableSchemaAware} and
+ * {@link org.apache.giraph.conf.ImmutableClassesGiraphConfigurable}
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class DefaultConfigurableAndTableSchemaAware<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
+    implements HiveTableSchemaAware {
+  /** Schema stored here */
+  private HiveTableSchema tableSchema;
+
+  @Override public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
new file mode 100644
index 0000000..7b64a40
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import com.google.common.collect.AbstractIterator;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link RecordReader} into {@link java.util.Iterator}
+ *
+ * @param <T> Data of record reader
+ */
+public class RecordReaderWrapper<T> extends AbstractIterator<T> {
+  /** Wrapped {@link RecordReader} */
+  private final RecordReader<WritableComparable, T> recordReader;
+
+  /**
+   * Constructor
+   *
+   * @param recordReader {@link RecordReader} to wrap
+   */
+  public RecordReaderWrapper(RecordReader<WritableComparable, T> recordReader) {
+    this.recordReader = recordReader;
+  }
+
+  @Override
+  protected T computeNext() {
+    try {
+      if (!recordReader.nextKeyValue()) {
+        endOfData();
+        return null;
+      }
+      return recordReader.getCurrentValue();
+    } catch (IOException e) {
+      throw new IllegalStateException("computeNext: IOException occurred");
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "computeNext: InterruptedException occurred");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
index f29fea7..c16e808 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
@@ -17,12 +17,10 @@
  */
 package org.apache.giraph.hive.input.edge;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
 /**
  * Base class for HiveToEdge implementations
  *
@@ -33,5 +31,10 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
  */
 public abstract class AbstractHiveToEdge<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, HiveToEdge<I, E> { }
+    extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+    implements HiveToEdge<I, E> {
+  @Override
+  public final void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 18b40c2..68edbfc 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.iterables.EdgeReaderWrapper;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,6 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     }
 
     reader.setHiveRecordReader(baseReader);
-    return reader;
+    return new EdgeReaderWrapper<I, E>(reader);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index 6fb183a..275f8f7 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -19,9 +19,9 @@
 package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.ReusableEdge;
-import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.iterables.EdgeWithSource;
+import org.apache.giraph.io.iterables.GiraphReader;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.giraph.hive.HiveRecord;
 import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
 import com.facebook.giraph.hive.HiveTableSchemas;
 
 import java.io.IOException;
@@ -42,11 +43,9 @@ import java.io.IOException;
  * @param <E> Edge Value
  */
 public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
-    implements EdgeReader<I, E> {
+    implements GiraphReader<EdgeWithSource<I, E>>, HiveTableSchemaAware {
   /** Configuration key for edge creator class */
   public static final String HIVE_TO_EDGE_KEY = "giraph.hive.to.edge.class";
-  /** Configuration key for whether to reuse edge */
-  public static final String REUSE_EDGE_KEY = "giraph.hive.reuse.edge";
 
   /** Underlying Hive RecordReader used */
   private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
@@ -58,11 +57,6 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
 
   /** User class to create edges from a HiveRecord */
   private HiveToEdge<I, E> hiveToEdge;
-  /**
-   * If we are reusing edges this will be the single edge to read into.
-   * Otherwise if it's null we will create a new edge each time.
-   */
-  private ReusableEdge<I, E> edgeToReuse = null;
 
   /**
    * Get underlying Hive record reader used.
@@ -83,20 +77,12 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
     this.hiveRecordReader = hiveRecordReader;
   }
 
-  /**
-   * Get Hive table schema for table being read from.
-   *
-   * @return Hive table schema for table
-   */
+  @Override
   public HiveTableSchema getTableSchema() {
     return tableSchema;
   }
 
-  /**
-   * Set Hive schema for table being read from.
-   *
-   * @param tableSchema Hive table schema
-   */
+  @Override
   public void setTableSchema(HiveTableSchema tableSchema) {
     this.tableSchema = tableSchema;
   }
@@ -116,13 +102,12 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
     hiveRecordReader.initialize(inputSplit, context);
     conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
     instantiateHiveToEdgeFromConf();
-    if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
-      edgeToReuse = conf.createReusableEdge();
-    }
+    hiveToEdge.initializeRecords(
+        new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
   }
 
   /**
-   * Retrieve the user's HiveEdgeCreator from the Configuration.
+   * Retrieve the user's {@link HiveToEdge} from the Configuration.
    *
    * @throws IOException if anything goes wrong reading from Configuration
    */
@@ -137,11 +122,6 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
   }
 
   @Override
-  public boolean nextEdge() throws IOException, InterruptedException {
-    return hiveRecordReader.nextKeyValue();
-  }
-
-  @Override
   public void close() throws IOException {
     hiveRecordReader.close();
   }
@@ -152,20 +132,17 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
   }
 
   @Override
-  public I getCurrentSourceId() throws IOException, InterruptedException {
-    return hiveToEdge.getSourceVertexId(hiveRecordReader.getCurrentValue());
+  public boolean hasNext() {
+    return hiveToEdge.hasNext();
   }
 
   @Override
-  public Edge<I, E> getCurrentEdge() throws IOException,
-      InterruptedException {
-    HiveRecord record = hiveRecordReader.getCurrentValue();
-    ReusableEdge<I, E> edge = edgeToReuse;
-    if (edge == null) {
-      edge = conf.createReusableEdge();
-    }
-    edge.setValue(hiveToEdge.getEdgeValue(record));
-    edge.setTargetVertexId(hiveToEdge.getTargetVertexId(record));
-    return edge;
+  public EdgeWithSource<I, E> next() {
+    return hiveToEdge.next();
+  }
+
+  @Override
+  public void remove() {
+    hiveToEdge.remove();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
index 2205b82..8b22a8f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -18,39 +18,30 @@
 
 package org.apache.giraph.hive.input.edge;
 
+import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
 
 /**
  * An interface used to create edges from Hive records.
  *
+ * It gets initialized with HiveRecord iterator, and it needs to provide an
+ * iterator over edges, so it's possible to skip some rows from the input,
+ * combine several rows together, etc.
+ *
  * @param <I> Vertex ID
  * @param <E> Edge Value
  */
-public interface HiveToEdge<I extends WritableComparable, E extends Writable> {
-  /**
-   * Read source vertex ID from Hive record
-   *
-   * @param hiveRecord HiveRecord to read from
-   * @return source vertex ID
-   */
-  I getSourceVertexId(HiveReadableRecord hiveRecord);
-
-  /**
-   * Read target vertex ID from Hive record
-   *
-   * @param hiveRecord HiveRecord to read from
-   * @return target vertex ID
-   */
-  I getTargetVertexId(HiveReadableRecord hiveRecord);
-
+public interface HiveToEdge<I extends WritableComparable,
+    E extends Writable> extends Iterator<EdgeWithSource<I, E>> {
   /**
-   * Read edge value from the Hive record.
+   * Set the records which contain edge input data
    *
-   * @param hiveRecord HiveRecord to read from
-   * @return Edge value
+   * @param records Hive records
    */
-  E getEdgeValue(HiveReadableRecord hiveRecord);
+  void initializeRecords(Iterator<HiveRecord> records);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
new file mode 100644
index 0000000..0b76683
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.io.iterables.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link HiveToEdge} when each edge is in the one
+ * row of the input.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class SimpleHiveToEdge<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends AbstractHiveToEdge<I, V, E, M> {
+  /** Iterator over input records */
+  private Iterator<HiveRecord> records;
+  /** Reusable {@link EdgeWithSource} object */
+  private EdgeWithSource<I, E> reusableEdge = new EdgeWithSource<I, E>();
+
+  /**
+   * Read source vertex ID from Hive record
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return source vertex ID
+   */
+  public abstract I getSourceVertexId(HiveReadableRecord hiveRecord);
+
+  /**
+   * Read target vertex ID from Hive record
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return target vertex ID
+   */
+  public abstract I getTargetVertexId(HiveReadableRecord hiveRecord);
+
+  /**
+   * Read edge value from the Hive record.
+   *
+   * @param hiveRecord HiveRecord to read from
+   * @return Edge value
+   */
+  public abstract E getEdgeValue(HiveReadableRecord hiveRecord);
+
+  @Override
+  public final void initializeRecords(Iterator<HiveRecord> records) {
+    this.records = records;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return records.hasNext();
+  }
+
+  @Override
+  public EdgeWithSource<I, E> next() {
+    HiveRecord record = records.next();
+    reusableEdge.setSourceVertexId(getSourceVertexId(record));
+    reusableEdge.setTargetVertexId(getTargetVertexId(record));
+    reusableEdge.setEdgeValue(getEdgeValue(record));
+    return reusableEdge;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
new file mode 100644
index 0000000..a138846
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Base class for HiveToVertex implementations
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class AbstractHiveToVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+    implements HiveToVertex<I, V, E, M> {
+  @Override
+  public final void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
deleted file mode 100644
index d0668f6..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
-/**
- * Base class for HiveToVertexEdges implementations
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- * @param <M> Message Value
- */
-public abstract class AbstractHiveToVertexEdges<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, HiveToVertexEdges<I, E> {
-  /** Schema stored here */
-  private HiveTableSchema tableSchema;
-
-  @Override public void setTableSchema(HiveTableSchema tableSchema) {
-    this.tableSchema = tableSchema;
-  }
-
-  @Override public HiveTableSchema getTableSchema() {
-    return tableSchema;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
deleted file mode 100644
index 9ab316f..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
-/**
- * Base class for HiveToVertex implementations
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- * @param <M> Message Value
- */
-public abstract class AbstractHiveToVertexValue<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, HiveToVertexValue<I, V> {
-  /** Schema stored here */
-  private HiveTableSchema tableSchema;
-
-  @Override public void setTableSchema(HiveTableSchema tableSchema) {
-    this.tableSchema = tableSchema;
-  }
-
-  @Override public HiveTableSchema getTableSchema() {
-    return tableSchema;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
new file mode 100644
index 0000000..1179961
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * An interface used to create vertices from Hive records.
+ *
+ * It gets initialized with HiveRecord iterator, and it needs to provide an
+ * iterator over vertices, so it's possible to skip some rows from the input,
+ * combine several rows together, etc.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public interface HiveToVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    Iterator<Vertex<I, V, E, M>> {
+  /**
+   * Set the records which contain vertex input data
+   *
+   * @param records Hive records
+   */
+  void initializeRecords(Iterator<HiveRecord> records);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
deleted file mode 100644
index 8076a8a..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveReadableRecord;
-
-import java.util.Collections;
-
-/**
- * Interface for creating edges for a vertex from a Hive record.
- * Used with HiveToVertex if you want to also read edges per vertex, as opposed
- * to using {@link org.apache.giraph.hive.input.edge.HiveEdgeInputFormat}
- *
- * @param <I> Vertex ID
- * @param <E> extends Writable
- */
-public interface HiveToVertexEdges<I extends WritableComparable,
-    E extends Writable> {
-  /**
-   * Read Vertex's edges from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return iterable of edges
-   */
-  Iterable<Edge<I, E>> getEdges(HiveReadableRecord record);
-
-  /**
-   * Default implementation that returns an empty list of edges.
-   */
-  public class Empty implements HiveToVertexEdges {
-    /** Singleton */
-    private static final Empty INSTANCE = new Empty();
-
-    /** Don't construct, allow inheritance */
-    protected Empty() { }
-
-    /**
-     * Get singleton instance
-     * @return Empty
-     */
-    public static Empty get() { return INSTANCE; }
-
-    @Override
-    public Iterable getEdges(HiveReadableRecord record) {
-      return Collections.emptyList();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
deleted file mode 100644
index 593eb9a..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveReadableRecord;
-
-/**
- * Interface for creating vertices from a Hive record.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- */
-public interface HiveToVertexValue<I extends WritableComparable,
-    V extends Writable> {
-  /**
-   * Read the Vertex's ID from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return Vertex ID
-   */
-  I getVertexId(HiveReadableRecord record);
-
-  /**
-   * Read the Vertex's Value from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return Vertex Value
-   */
-  V getVertexValue(HiveReadableRecord record);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 25c7a26..4f70750 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.hive.input.vertex;
 import org.apache.giraph.hive.common.HiveProfiles;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.iterables.VertexReaderWrapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -67,7 +68,7 @@ public class HiveVertexInputFormat<I extends WritableComparable,
       TaskAttemptContext context) throws IOException {
     Configuration conf = context.getConfiguration();
 
-    HiveVertexReader reader = new HiveVertexReader();
+    HiveVertexReader<I, V, E, M> reader = new HiveVertexReader<I, V, E, M>();
     reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
 
     HiveApiRecordReader baseReader;
@@ -78,6 +79,6 @@ public class HiveVertexInputFormat<I extends WritableComparable,
     }
 
     reader.setHiveRecordReader(baseReader);
-    return reader;
+    return new VertexReaderWrapper<I, V, E, M>(reader);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 541176f..442c796 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -19,9 +19,9 @@
 package org.apache.giraph.hive.input.vertex;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
+import org.apache.giraph.io.iterables.GiraphReader;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -46,16 +46,10 @@ import java.io.IOException;
  */
 public class HiveVertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements VertexReader<I, V, E, M>, HiveTableSchemaAware {
-  /** Configuration key for {@link HiveToVertexValue} class */
+    implements GiraphReader<Vertex<I, V, E, M>>, HiveTableSchemaAware {
+  /** Configuration key for {@link HiveToVertex} class */
   public static final String HIVE_TO_VERTEX_KEY =
-      "giraph.hive.to.vertex.value.class";
-  /** Configuration key for {@link HiveToVertexEdges} class */
-  public static final String HIVE_TO_VERTEX_EDGES_KEY =
-      "giraph.hive.to.vertex.edges.class";
-  /** Configuration key for whether to reuse vertex */
-  public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex";
-
+      "giraph.hive.to.vertex.class";
   /** Underlying Hive RecordReader used */
   private HiveApiRecordReader hiveRecordReader;
   /** Schema for table in Hive */
@@ -64,16 +58,11 @@ public class HiveVertexReader<I extends WritableComparable,
   /** Configuration */
   private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
-  /** User class to create vertices from a HiveRecord */
-  private HiveToVertexValue<I, V> hiveToVertexValue;
-  /** User class to create vertex edges from HiveRecord - optional */
-  private HiveToVertexEdges<I, E> hiveToVertexEdges;
-
   /**
-   * If we are reusing vertices this will be the single vertex to read into.
-   * Otherwise if it's null we will create a new vertex each time.
+   * {@link HiveToVertex} chosen by user,
+   * or {@link SimpleHiveToVertex} if none specified
    */
-  private Vertex<I, V, E, M> vertexToReuse = null;
+  private HiveToVertex<I, V, E, M> hiveToVertex;
 
   /**
    * Get underlying Hive record reader used.
@@ -113,74 +102,41 @@ public class HiveVertexReader<I extends WritableComparable,
   }
 
   @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-    throws IOException, InterruptedException {
+  public void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
-    conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
-    instantiateHiveToVertexValueFromConf();
-    instantiateHiveToVertexEdgesFromConf();
-    if (conf.getBoolean(REUSE_VERTEX_KEY, false)) {
-      vertexToReuse = conf.createVertex();
-    }
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E,
+        M>(context.getConfiguration());
+    Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
+        SimpleHiveToVertex.class, HiveToVertex.class);
+    hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+    HiveTableSchemas.configure(hiveToVertex, tableSchema);
+    hiveToVertex.initializeRecords(
+        new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
   }
 
-  /**
-   * Retrieve the user's HiveToVertex from our configuration.
-   *
-   * @throws IOException if anything goes wrong reading from Configuration.
-   */
-  private void instantiateHiveToVertexValueFromConf() throws IOException {
-    Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
-        null, HiveToVertexValue.class);
-    if (klass == null) {
-      throw new IOException(HIVE_TO_VERTEX_KEY + " not set in conf");
-    }
-    hiveToVertexValue = ReflectionUtils.newInstance(klass, conf);
-    HiveTableSchemas.configure(hiveToVertexValue, tableSchema);
-  }
-
-  /**
-   * Retrieve the user's HiveToVertexEdges from our configuration. This class
-   * is optional. If not specified will just use HiveToVertexEdges.Empty.
-   */
-  private void instantiateHiveToVertexEdgesFromConf() {
-    Class<? extends HiveToVertexEdges> klass = conf.getClass(
-        HIVE_TO_VERTEX_EDGES_KEY, null, HiveToVertexEdges.class);
-    if (klass == null) {
-      hiveToVertexEdges = HiveToVertexEdges.Empty.get();
-    } else {
-      hiveToVertexEdges = ReflectionUtils.newInstance(klass, conf);
-    }
-    HiveTableSchemas.configure(hiveToVertexEdges, tableSchema);
+  @Override
+  public void close() throws IOException {
+    hiveRecordReader.close();
   }
 
   @Override
-  public boolean nextVertex() throws IOException, InterruptedException {
-    return hiveRecordReader.nextKeyValue();
+  public float getProgress() throws IOException, InterruptedException {
+    return hiveRecordReader.getProgress();
   }
 
   @Override
-  public void close() throws IOException {
-    hiveRecordReader.close();
+  public boolean hasNext() {
+    return hiveToVertex.hasNext();
   }
 
   @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return hiveRecordReader.getProgress();
+  public Vertex<I, V, E, M> next() {
+    return hiveToVertex.next();
   }
 
   @Override
-  public final Vertex<I, V, E, M> getCurrentVertex()
-    throws IOException, InterruptedException {
-    HiveRecord hiveRecord = hiveRecordReader.getCurrentValue();
-    Vertex<I, V, E, M> vertex = vertexToReuse;
-    if (vertex == null) {
-      vertex = conf.createVertex();
-    }
-    I id = hiveToVertexValue.getVertexId(hiveRecord);
-    V value = hiveToVertexValue.getVertexValue(hiveRecord);
-    Iterable<Edge<I, E>> edges = hiveToVertexEdges.getEdges(hiveRecord);
-    vertex.initialize(id, value, edges);
-    return vertex;
+  public void remove() {
+    hiveToVertex.remove();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
new file mode 100644
index 0000000..f42536d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link HiveToVertex} when each vertex is in the one
+ * row of the input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class SimpleHiveToVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends AbstractHiveToVertex<I, V, E, M> {
+  /** Hive records which we are reading from */
+  private Iterator<HiveRecord> records;
+
+  /** Reusable vertex object */
+  private Vertex<I, V, E, M> reusableVertex;
+
+  /**
+   * Read the Vertex's ID from the HiveRecord given.
+   *
+   * @param record HiveRecord to read from.
+   * @return Vertex ID
+   */
+  public abstract I getVertexId(HiveReadableRecord record);
+
+  /**
+   * Read the Vertex's Value from the HiveRecord given.
+   *
+   * @param record HiveRecord to read from.
+   * @return Vertex Value
+   */
+  public abstract V getVertexValue(HiveReadableRecord record);
+
+  /**
+   * Read Vertex's edges from the HiveRecord given.
+   *
+   * @param record HiveRecord to read from.
+   * @return iterable of edges
+   */
+  public abstract Iterable<Edge<I, E>> getEdges(HiveReadableRecord record);
+
+  @Override
+  public void initializeRecords(Iterator<HiveRecord> records) {
+    this.records = records;
+    reusableVertex = getConf().createVertex();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return records.hasNext();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> next() {
+    HiveRecord record = records.next();
+    I id = getVertexId(record);
+    V value = getVertexValue(record);
+    Iterable<Edge<I, E>> edges = getEdges(record);
+    reusableVertex.initialize(id, value, edges);
+    return reusableVertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
new file mode 100644
index 0000000..195a69e
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Simple implementation of {@link HiveToVertex} when each vertex is in the one
+ * row of the input, and there are no edges in vertex input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class SimpleNoEdgesHiveToVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    SimpleHiveToVertex<I, V, E, M> {
+  @Override
+  public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) {
+    return ImmutableList.of();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
index fe0771d..a688bf5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
@@ -17,13 +17,10 @@
  */
 package org.apache.giraph.hive.output;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
 /**
  * Base class for VertexToHive implementations
  *
@@ -34,16 +31,5 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
  */
 public abstract class AbstractVertexToHive<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, VertexToHive<I, V, E> {
-  /** Schema stored here */
-  private HiveTableSchema tableSchema;
-
-  @Override public void setTableSchema(HiveTableSchema tableSchema) {
-    this.tableSchema = tableSchema;
-  }
-
-  @Override public HiveTableSchema getTableSchema() {
-    return tableSchema;
-  }
-}
+    extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+    implements VertexToHive<I, V, E> { }