You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/05/04 00:57:21 UTC

git commit: updated refs/heads/trunk to 4b0f52c

Updated Branches:
  refs/heads/trunk c5a87d161 -> 4b0f52cc8


GIRAPH-651: giraph-hive tests


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4b0f52cc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4b0f52cc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4b0f52cc

Branch: refs/heads/trunk
Commit: 4b0f52cc8e8841b64ac970733c1ef5dcd0ad4c18
Parents: c5a87d1
Author: Nitay Joffe <ni...@apache.org>
Authored: Fri May 3 15:55:45 2013 -0700
Committer: Nitay Joffe <ni...@apache.org>
Committed: Fri May 3 15:56:42 2013 -0700

----------------------------------------------------------------------
 .gitignore                                         |    2 +
 CHANGELOG                                          |    2 +
 .../apache/giraph/utils/InternalVertexRunner.java  |    7 +-
 giraph-hcatalog/pom.xml                            |    4 -
 giraph-hive/pom.xml                                |    9 +
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |   32 +--
 .../giraph/hive/common/GiraphHiveConstants.java    |   50 +----
 .../giraph/hive/common/HiveInputOptions.java       |  179 +++++++++++++++
 .../org/apache/giraph/hive/common/HiveParsing.java |  159 +++++++++++++
 .../org/apache/giraph/hive/common/HiveUtils.java   |   16 +-
 .../hive/input/edge/HiveEdgeInputFormat.java       |   13 +-
 .../giraph/hive/input/edge/HiveEdgeReader.java     |    7 +-
 .../input/edge/examples/HiveIntDoubleEdge.java     |   46 ++++
 .../hive/input/edge/examples/HiveIntNullEdge.java  |   46 ++++
 .../hive/input/edge/examples/package-info.java     |   21 ++
 .../hive/input/vertex/HiveVertexInputFormat.java   |   14 +-
 .../giraph/hive/input/vertex/HiveVertexReader.java |    4 +-
 .../vertex/examples/HiveIntDoubleDoubleVertex.java |   48 ++++
 .../vertex/examples/HiveIntNullNullVertex.java     |   47 ++++
 .../hive/input/vertex/examples/package-info.java   |   21 ++
 .../giraph/hive/output/HiveVertexOutputFormat.java |    4 +-
 .../output/examples/HiveOutputIntIntVertex.java    |   38 +++
 .../giraph/hive/output/examples/package-info.java  |   22 ++
 .../test/java/org/apache/giraph/hive/Helpers.java  |   53 +++++
 .../giraph/hive/input/HiveEdgeInputTest.java       |  149 ++++++++++++
 .../giraph/hive/input/HiveVertexInputTest.java     |  152 ++++++++++++
 .../org/apache/giraph/hive/input/package-info.java |   22 ++
 .../apache/giraph/hive/output/HiveOutputTest.java  |  151 ++++++++++++
 .../apache/giraph/hive/output/package-info.java    |   22 ++
 .../giraph/hive/vertexes/VertexCountEdges.java     |   34 +++
 .../giraph/hive/vertexes/VertexSumEdges.java       |   39 +++
 .../apache/giraph/hive/vertexes/package-info.java  |   22 ++
 pom.xml                                            |   17 +-
 33 files changed, 1336 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0b7eb2e..8c911c1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ failed-profile.txt
 
 # Vim auto-save files:
 .*.swp
+
+/giraph-hive/derby.log

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4d2c90c..7ede165 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-651: giraph-hive tests (nitay)
+
   GIRAPH-639: Add support for multiple Vertex/Edge inputs (majakabiljo)
 
   GIRAPH-653: Hadoop_non_secure broken (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 029cb5d..be2d2a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -71,7 +71,6 @@ public class InternalVertexRunner {
    * writing to a temporary folder on local disk. Will start its own zookeeper
    * instance.
    *
-   *
    * @param conf GiraphClasses specifying which types to use
    * @param vertexInputData linewise vertex input data
    * @return linewise output data
@@ -181,9 +180,9 @@ public class InternalVertexRunner {
         zookeeper.end();
       }
 
-      if (conf.hasVertexOutputFormat()) {
-        return Files.readLines(new File(outputDir, "part-m-00000"),
-            Charsets.UTF_8);
+      File outFile = new File(outputDir, "part-m-00000");
+      if (conf.hasVertexOutputFormat() && outFile.canRead()) {
+        return Files.readLines(outFile, Charsets.UTF_8);
       } else {
         return ImmutableList.of();
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/pom.xml b/giraph-hcatalog/pom.xml
index 03b86b5..d9ad9ee 100644
--- a/giraph-hcatalog/pom.xml
+++ b/giraph-hcatalog/pom.xml
@@ -154,10 +154,6 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
-      <artifactId>hive-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
       <artifactId>hive-exec</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-hive/pom.xml b/giraph-hive/pom.xml
index c2d46e9..9197f9a 100644
--- a/giraph-hive/pom.xml
+++ b/giraph-hive/pom.xml
@@ -106,6 +106,10 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-metastore</artifactId>
     </dependency>
 
@@ -116,5 +120,10 @@ under the License.
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.facebook.hiveio</groupId>
+      <artifactId>hive-io-exp-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 98be881..da9ee2f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -55,16 +55,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_DATABASE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PARTITION;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_TABLE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_DATABASE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PARTITION;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
@@ -143,14 +135,14 @@ public class HiveGiraphRunner implements Tool {
     VertexInputFormatDescription description =
         new VertexInputFormatDescription(HiveVertexInputFormat.class);
     description.addParameter(
-        HIVE_TO_VERTEX_CLASS.getKey(), hiveToVertexClass.getName());
-    description.addParameter(HIVE_VERTEX_INPUT_PROFILE_ID.getKey(),
+        HIVE_VERTEX_INPUT.getClassOpt().getKey(), hiveToVertexClass.getName());
+    description.addParameter(HIVE_VERTEX_INPUT.getProfileIdOpt().getKey(),
         "vertex_input_profile_" + vertexInputDescriptions.size());
     description.addParameter(
-        HIVE_VERTEX_INPUT_TABLE.getKey(), tableName);
+        HIVE_VERTEX_INPUT.getTableOpt().getKey(), tableName);
     if (partitionFilter != null && !partitionFilter.isEmpty()) {
       description.addParameter(
-          HIVE_VERTEX_INPUT_PARTITION.getKey(), partitionFilter);
+          HIVE_VERTEX_INPUT.getPartitionOpt().getKey(), partitionFilter);
     }
     addAdditionalOptions(description, additionalOptions);
     vertexInputDescriptions.add(description);
@@ -182,14 +174,14 @@ public class HiveGiraphRunner implements Tool {
     EdgeInputFormatDescription description =
         new EdgeInputFormatDescription(HiveEdgeInputFormat.class);
     description.addParameter(
-        HIVE_TO_EDGE_CLASS.getKey(), hiveToEdgeClass.getName());
-    description.addParameter(HIVE_EDGE_INPUT_PROFILE_ID.getKey(),
+        HIVE_EDGE_INPUT.getClassOpt().getKey(), hiveToEdgeClass.getName());
+    description.addParameter(HIVE_EDGE_INPUT.getProfileIdOpt().getKey(),
         "edge_input_profile_" + edgeInputDescriptions.size());
     description.addParameter(
-        HIVE_EDGE_INPUT_TABLE.getKey(), tableName);
+        HIVE_EDGE_INPUT.getTableOpt().getKey(), tableName);
     if (partitionFilter != null && !partitionFilter.isEmpty()) {
       description.addParameter(
-          HIVE_EDGE_INPUT_PARTITION.getKey(), partitionFilter);
+          HIVE_EDGE_INPUT.getPartitionOpt().getKey(), partitionFilter);
     }
     addAdditionalOptions(description, additionalOptions);
     edgeInputDescriptions.add(description);
@@ -450,12 +442,12 @@ public class HiveGiraphRunner implements Tool {
     String dbName = cmdln.getOptionValue("dbName", "default");
 
     if (hasVertexInput()) {
-      HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
+      HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, dbName);
       prepareHiveVertexInputs();
     }
 
     if (hasEdgeInput()) {
-      HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
+      HIVE_EDGE_INPUT.getDatabaseOpt().set(conf, dbName);
       prepareHiveEdgeInputs();
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
index 49e1bb5..d23046b 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ClassConfOption;
-import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
@@ -29,45 +28,12 @@ import org.apache.giraph.hive.output.VertexToHive;
  * Constants for giraph-hive
  */
 public class GiraphHiveConstants {
-  /** Class for converting hive records to edges */
-  public static final ClassConfOption<HiveToVertex> HIVE_TO_VERTEX_CLASS =
-      ClassConfOption.create("giraph.hive.to.vertex.class", null,
-          HiveToVertex.class);
-  /** Vertex input profile id */
-  public static final StrConfOption HIVE_VERTEX_INPUT_PROFILE_ID =
-      new StrConfOption("giraph.hive.input.vertex.profileId", "");
-  /** Number of vertex splits */
-  public static final IntConfOption HIVE_VERTEX_SPLITS =
-      new IntConfOption("giraph.hive.input.vertex.splits", 0);
-  /** Vertex input database name */
-  public static final StrConfOption HIVE_VERTEX_INPUT_DATABASE =
-      new StrConfOption("giraph.hive.input.vertex.database", "");
-  /** Vertex input table name */
-  public static final StrConfOption HIVE_VERTEX_INPUT_TABLE =
-      new StrConfOption("giraph.hive.input.vertex.table", "");
-  /** Vertex input partition filter */
-  public static final StrConfOption HIVE_VERTEX_INPUT_PARTITION =
-      new StrConfOption("giraph.hive.input.vertex.partition", "");
-
-  /** Class for converting hive records to edges */
-  public static final ClassConfOption<HiveToEdge> HIVE_TO_EDGE_CLASS =
-      ClassConfOption.create("giraph.hive.to.edge.class", null,
-          HiveToEdge.class);
-  /** Edge input profile id */
-  public static final StrConfOption HIVE_EDGE_INPUT_PROFILE_ID =
-      new StrConfOption("giraph.hive.input.edge.profileId", "");
-  /** Number of edge splits */
-  public static final IntConfOption HIVE_EDGE_SPLITS =
-      new IntConfOption("giraph.hive.input.edge.splits", 0);
-  /** Edge input database name */
-  public static final StrConfOption HIVE_EDGE_INPUT_DATABASE =
-      new StrConfOption("giraph.hive.input.edge.database", "");
-  /** Edge input table name */
-  public static final StrConfOption HIVE_EDGE_INPUT_TABLE =
-      new StrConfOption("giraph.hive.input.edge.table", "");
-  /** Edge input partition filter */
-  public static final StrConfOption HIVE_EDGE_INPUT_PARTITION =
-      new StrConfOption("giraph.hive.input.edge.partition", "");
+  /** Options for configuring vertex input */
+  public static final HiveInputOptions<HiveToVertex> HIVE_VERTEX_INPUT =
+      new HiveInputOptions<HiveToVertex>("vertex", HiveToVertex.class);
+  /** Options for configuring edge input */
+  public static final HiveInputOptions<HiveToEdge> HIVE_EDGE_INPUT =
+        new HiveInputOptions<HiveToEdge>("edge", HiveToEdge.class);
 
   /** Class for converting vertices to Hive records */
   public static final ClassConfOption<VertexToHive> VERTEX_TO_HIVE_CLASS =
@@ -75,10 +41,10 @@ public class GiraphHiveConstants {
           VertexToHive.class);
   /** Vertex output profile id */
   public static final StrConfOption HIVE_VERTEX_OUTPUT_PROFILE_ID =
-      new StrConfOption("giraph.hive.output.vertex.profileId", "");
+      new StrConfOption("giraph.hive.output.vertex.profileId", "vertex_output");
   /** Vertex output database name */
   public static final StrConfOption HIVE_VERTEX_OUTPUT_DATABASE =
-      new StrConfOption("giraph.hive.output.vertex.database", "");
+      new StrConfOption("giraph.hive.output.vertex.database", "default");
   /** Vertex output table name */
   public static final StrConfOption HIVE_VERTEX_OUTPUT_TABLE =
       new StrConfOption("giraph.hive.output.vertex.table", "");

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
new file mode 100644
index 0000000..6a89614
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.common;
+
+import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.hadoop.conf.Configuration;
+
+import com.facebook.hiveio.input.HiveInputDescription;
+
+/**
+ * Holder for Hive Input Configuration options. Used for vertex and edge input.
+ * @param <C> {@link org.apache.giraph.hive.input.edge.HiveToEdge} or
+ *            {@link org.apache.giraph.hive.input.vertex.HiveToVertex}
+ */
+public class HiveInputOptions<C> {
+  /** Class for converting hive records */
+  private final ClassConfOption<C> classOpt;
+  /** Input profile id */
+  private final StrConfOption profileIdOpt;
+  /** Number of splits */
+  private final IntConfOption splitsOpt;
+  /** Input database name */
+  private final StrConfOption databaseOpt;
+  /** Input table name */
+  private final StrConfOption tableOpt;
+  /** Input partition filter */
+  private final StrConfOption partitionOpt;
+  /** Hive Metastore host to use. If blank will infer from HiveConf */
+  private final StrConfOption hostOpt;
+  /** Hive Metastore port to use. */
+  private final IntConfOption portOpt;
+
+  /**
+   * Constructor
+   * @param name "vertex" or "edge"
+   * @param hiveToTypeClass HiveToVertex or HiveToEdge
+   */
+  public HiveInputOptions(String name, Class<C> hiveToTypeClass) {
+    classOpt = ClassConfOption.<C>create(key(name, "class"),
+        null, hiveToTypeClass);
+    profileIdOpt = new StrConfOption(key(name, "profileId"),
+        name + "_input_profile");
+    partitionOpt = new StrConfOption(key(name, "partition"), "");
+    splitsOpt = new IntConfOption(key(name, "splits"), 0);
+    databaseOpt = new StrConfOption(key(name, "database"), "default");
+    tableOpt = new StrConfOption(key(name, "table"), "");
+    hostOpt = new StrConfOption(key(name, "metastore.host"), "");
+    portOpt = new IntConfOption(key(name, "metastore.port"), 9083);
+  }
+
+  /**
+   * Create Configuration key from name and suffix
+   * @param name the name
+   * @param suffix the suffix
+   * @return key
+   */
+  private static String key(String name, String suffix) {
+    return "giraph.hive.input." + name + "." + suffix;
+  }
+
+  /**
+   * Get profile ID from Configuration
+   * @param conf Configuration
+   * @return profile ID
+   */
+  public String getProfileID(Configuration conf) {
+    return profileIdOpt.get(conf);
+  }
+
+  /**
+   * Set HiveToX class to use
+   * @param conf Configuraton
+   * @param hiveToTypeClass class to use
+   */
+  public void setClass(Configuration conf, Class<? extends C> hiveToTypeClass) {
+    classOpt.set(conf, hiveToTypeClass);
+  }
+
+  /**
+   * Set Database to use
+   * @param conf Configuration
+   * @param dbName database
+   */
+  public void setDatabase(Configuration conf, String dbName) {
+    databaseOpt.set(conf, dbName);
+  }
+
+  /**
+   * Set Table to use
+   * @param conf Configuration
+   * @param tableName table
+   */
+  public void setTable(Configuration conf, String tableName) {
+    tableOpt.set(conf, tableName);
+  }
+
+  /**
+   * Set partition filter to use
+   * @param conf Configuration
+   * @param partitionFilter partition filter
+   */
+  public void setPartition(Configuration conf, String partitionFilter) {
+    partitionOpt.set(conf, partitionFilter);
+  }
+
+  /**
+   * Get HiveToX class set in Configuration
+   * @param conf Configuration
+   * @return HiveToX
+   */
+  public Class<? extends C> getClass(Configuration conf) {
+    return classOpt.get(conf);
+  }
+
+  public StrConfOption getDatabaseOpt() {
+    return databaseOpt;
+  }
+
+  public StrConfOption getHostOpt() {
+    return hostOpt;
+  }
+
+  public ClassConfOption<C> getClassOpt() {
+    return classOpt;
+  }
+
+  public StrConfOption getPartitionOpt() {
+    return partitionOpt;
+  }
+
+  public IntConfOption getPortOpt() {
+    return portOpt;
+  }
+
+  public StrConfOption getProfileIdOpt() {
+    return profileIdOpt;
+  }
+
+  public IntConfOption getSplitsOpt() {
+    return splitsOpt;
+  }
+
+  public StrConfOption getTableOpt() {
+    return tableOpt;
+  }
+
+  /**
+   * Create a HiveInputDescription from the options in the Configuration
+   * @param conf Configuration
+   * @return HiveInputDescription
+   */
+  public HiveInputDescription makeInputDescription(Configuration conf) {
+    HiveInputDescription inputDescription = new HiveInputDescription();
+    inputDescription.setDbName(databaseOpt.get(conf));
+    inputDescription.setTableName(tableOpt.get(conf));
+    inputDescription.setPartitionFilter(partitionOpt.get(conf));
+    inputDescription.setNumSplits(splitsOpt.get(conf));
+    inputDescription.getMetastoreDesc().setHost(hostOpt.get(conf));
+    inputDescription.getMetastoreDesc().setPort(portOpt.get(conf));
+    return inputDescription;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
new file mode 100644
index 0000000..bd28396
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.common;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helpers for parsing with Hive.
+ */
+public class HiveParsing {
+  /** Don't construct */
+  private HiveParsing() { }
+
+  /**
+   * Parse a byte from a Hive record
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return byte
+   */
+  public static byte parseByte(HiveReadableRecord record, int columnIndex) {
+    return (byte) record.getLong(columnIndex);
+  }
+
+  /**
+   * Parse a int from a Hive record
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return int
+   */
+  public static int parseInt(HiveReadableRecord record, int columnIndex) {
+    return (int) record.getLong(columnIndex);
+  }
+
+  /**
+   * Parse a Integer ID from a Hive record
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return LongWritable ID
+   */
+  public static IntWritable parseIntID(HiveReadableRecord record,
+      int columnIndex) {
+    return new IntWritable(parseInt(record, columnIndex));
+  }
+
+  /**
+   * Parse a Long ID from a Hive record
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return LongWritable ID
+   */
+  public static LongWritable parseLongID(HiveReadableRecord record,
+      int columnIndex) {
+    return new LongWritable(record.getLong(columnIndex));
+  }
+
+  /**
+   * Parse a weight from a Hive record
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return DoubleWritable weight
+   */
+  public static DoubleWritable parseDoubleWritable(HiveReadableRecord record,
+      int columnIndex) {
+    return new DoubleWritable(record.getDouble(columnIndex));
+  }
+
+  /**
+   * Parse edges as mappings of integer => double (id to weight)
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return edges
+   */
+  @SuppressWarnings("unchecked")
+  public static Iterable<Edge<IntWritable, DoubleWritable>> parseIntDoubleEdges(
+      HiveReadableRecord record, int columnIndex) {
+    Object edgesObj = record.get(columnIndex);
+    if (edgesObj == null) {
+      return ImmutableList.of();
+    }
+    Map<Long, Double> readEdges = (Map<Long, Double>) edgesObj;
+    List<Edge<IntWritable, DoubleWritable>> edges =
+        Lists.newArrayListWithCapacity(readEdges.size());
+    for (Map.Entry<Long, Double> entry : readEdges.entrySet()) {
+      edges.add(EdgeFactory.create(new IntWritable(entry.getKey().intValue()),
+          new DoubleWritable(entry.getValue())));
+    }
+    return edges;
+  }
+
+  /**
+   * Parse edges from a list
+   * @param record hive record
+   * @param index column index
+   * @return iterable of edges
+   */
+  public static Iterable<Edge<IntWritable, NullWritable>> parseIntNullEdges(
+      HiveReadableRecord record, int index) {
+    List<Long> ids = (List<Long>) record.get(index);
+    if (ids == null) {
+      return ImmutableList.of();
+    }
+    ImmutableList.Builder<Edge<IntWritable, NullWritable>> builder =
+        ImmutableList.builder();
+    for (long id : ids) {
+      builder.add(EdgeFactory.create(new IntWritable((int) id)));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Parse edges as mappings of long => double (id to weight)
+   * @param record Hive record to parse
+   * @param columnIndex offset of column in row
+   * @return edges
+   */
+  @SuppressWarnings("unchecked")
+  public static Iterable<Edge<LongWritable, DoubleWritable>>
+  parseLongDoubleEdges(HiveReadableRecord record, int columnIndex) {
+    Object edgesObj = record.get(columnIndex);
+    if (edgesObj == null) {
+      return ImmutableList.of();
+    }
+    Map<Long, Double> readEdges = (Map<Long, Double>) edgesObj;
+    List<Edge<LongWritable, DoubleWritable>> edges =
+        Lists.newArrayListWithCapacity(readEdges.size());
+    for (Map.Entry<Long, Double> entry : readEdges.entrySet()) {
+      edges.add(EdgeFactory.create(new LongWritable(entry.getKey()),
+          new DoubleWritable(entry.getValue())));
+    }
+    return edges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index a8c88f5..2d2fc1e 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -45,22 +45,14 @@ public class HiveUtils {
    * Initialize hive input, prepare Configuration parameters
    *
    * @param hiveInputFormat HiveApiInputFormat
-   * @param profileId Profile id
-   * @param dbName Database name
-   * @param tableName Table name
-   * @param partitionFilter Partition filter
-   * @param numSplits Number of splits
+   * @param inputDescription HiveInputDescription
+   * @param profileId profile ID
    * @param conf Configuration
    */
   public static void initializeHiveInput(HiveApiInputFormat hiveInputFormat,
-      String profileId, String dbName, String tableName, String partitionFilter,
-      int numSplits, Configuration conf) {
+      HiveInputDescription inputDescription, String profileId,
+      Configuration conf) {
     hiveInputFormat.setMyProfileId(profileId);
-    HiveInputDescription inputDescription = new HiveInputDescription();
-    inputDescription.setDbName(dbName);
-    inputDescription.setTableName(tableName);
-    inputDescription.setPartitionFilter(partitionFilter);
-    inputDescription.setNumSplits(numSplits);
     HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, profileId);
     HiveTableSchemas.put(conf, profileId, inputDescription.hiveTableName());
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index bdae1dd..f35b6ea 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
@@ -36,11 +37,6 @@ import com.facebook.hiveio.record.HiveReadableRecord;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_DATABASE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PARTITION;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT_TABLE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS;
 
 /**
  * {@link EdgeInputFormat} for reading edges from Hive.
@@ -66,11 +62,8 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     super.setConf(conf);
     HiveUtils.initializeHiveInput(
         hiveInputFormat,
-        HIVE_EDGE_INPUT_PROFILE_ID.get(conf),
-        HIVE_EDGE_INPUT_DATABASE.get(conf),
-        HIVE_EDGE_INPUT_TABLE.get(conf),
-        HIVE_EDGE_INPUT_PARTITION.get(conf),
-        HIVE_EDGE_SPLITS.get(conf),
+        GiraphHiveConstants.HIVE_EDGE_INPUT.makeInputDescription(conf),
+        GiraphHiveConstants.HIVE_EDGE_INPUT.getProfileID(conf),
         conf);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index 2891a17..ba99267 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -34,7 +34,7 @@ import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
 
 /**
  * A reader for reading edges from Hive.
@@ -86,9 +86,10 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
    * @throws IOException if anything goes wrong reading from Configuration
    */
   private void instantiateHiveToEdgeFromConf() throws IOException {
-    Class<? extends HiveToEdge> klass = HIVE_TO_EDGE_CLASS.get(getConf());
+    Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(getConf());
     if (klass == null) {
-      throw new IOException(HIVE_TO_EDGE_CLASS.getKey() + " not set in conf");
+      throw new IOException(HIVE_EDGE_INPUT.getClassOpt().getKey() +
+          " not set in conf");
     }
     hiveToEdge = ReflectionUtils.newInstance(klass, getConf());
     HiveTableSchemas.configure(hiveToEdge, getTableSchema());

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
new file mode 100644
index 0000000..76cf7e0
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.edge.examples;
+
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * A simple HiveToEdge with integer IDs and double edge values.
+ */
+public class HiveIntDoubleEdge
+    extends SimpleHiveToEdge<IntWritable, DoubleWritable> {
+  @Override
+  public DoubleWritable getEdgeValue(HiveReadableRecord hiveRecord) {
+    return HiveParsing.parseDoubleWritable(hiveRecord, 2);
+  }
+
+  @Override
+  public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) {
+    return HiveParsing.parseIntID(hiveRecord, 0);
+  }
+
+  @Override
+  public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) {
+    return HiveParsing.parseIntID(hiveRecord, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
new file mode 100644
index 0000000..3de9680
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.edge.examples;
+
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * A simple HiveToEdge with integer IDs, no edge value, that assumes the Hive
+ * table is made up of [source,target] columns.
+ */
+public class HiveIntNullEdge
+    extends SimpleHiveToEdge<IntWritable, NullWritable> {
+  @Override public NullWritable getEdgeValue(HiveReadableRecord hiveRecord) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) {
+    return HiveParsing.parseIntID(hiveRecord, 0);
+  }
+
+  @Override
+  public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) {
+    return HiveParsing.parseIntID(hiveRecord, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java
new file mode 100644
index 0000000..56ff60b
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive input edge examples.
+ */
+package org.apache.giraph.hive.input.edge.examples;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 55500cb..824d8a6 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.hive.input.vertex;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
@@ -36,12 +37,6 @@ import com.facebook.hiveio.record.HiveReadableRecord;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_DATABASE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PARTITION;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT_TABLE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_SPLITS;
-
 /**
  * {@link VertexInputFormat} for reading vertices from Hive.
  *
@@ -68,11 +63,8 @@ public class HiveVertexInputFormat<I extends WritableComparable,
     super.setConf(conf);
     HiveUtils.initializeHiveInput(
         hiveInputFormat,
-        HIVE_VERTEX_INPUT_PROFILE_ID.get(conf),
-        HIVE_VERTEX_INPUT_DATABASE.get(conf),
-        HIVE_VERTEX_INPUT_TABLE.get(conf),
-        HIVE_VERTEX_INPUT_PARTITION.get(conf),
-        HIVE_VERTEX_SPLITS.get(conf),
+        GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf),
+        GiraphHiveConstants.HIVE_VERTEX_INPUT.getProfileID(conf),
         conf);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index d5e352d..719d8ad 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -34,7 +34,7 @@ import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 
 /**
  * VertexReader using Hive
@@ -80,7 +80,7 @@ public class HiveVertexReader<I extends WritableComparable,
   public void initialize(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
-    Class<? extends HiveToVertex> klass = HIVE_TO_VERTEX_CLASS.get(getConf());
+    Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(getConf());
     hiveToVertex = ReflectionUtils.newInstance(klass, getConf());
     HiveTableSchemas.configure(hiveToVertex, getTableSchema());
     hiveToVertex.initializeRecords(

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
new file mode 100644
index 0000000..ea2f419
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.vertex.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Simple HiveToVertex that reads vertices with integer IDs, Double vertex
+ * values, and edges with Double values.
+ */
+public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable,
+    DoubleWritable, DoubleWritable> {
+  @Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges(
+      HiveReadableRecord record) {
+    return HiveParsing.parseIntDoubleEdges(record, 2);
+  }
+
+  @Override
+  public IntWritable getVertexId(HiveReadableRecord record) {
+    return HiveParsing.parseIntID(record, 0);
+  }
+
+  @Override
+  public DoubleWritable getVertexValue(HiveReadableRecord record) {
+    return HiveParsing.parseDoubleWritable(record, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
new file mode 100644
index 0000000..4e32039
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.vertex.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
+ * and edges with no values.
+ */
+public class HiveIntNullNullVertex
+    extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> {
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges(
+      HiveReadableRecord record) {
+    return HiveParsing.parseIntNullEdges(record, 1);
+  }
+
+  @Override public IntWritable getVertexId(HiveReadableRecord record) {
+    return HiveParsing.parseIntID(record, 0);
+  }
+
+  @Override public NullWritable getVertexValue(HiveReadableRecord record) {
+    return NullWritable.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
new file mode 100644
index 0000000..ea15393
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive input vertex examples.
+ */
+package org.apache.giraph.hive.input.vertex.examples;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index 769c874..05441f4 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.hive.output;
 
-import java.io.IOException;
-
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -34,6 +32,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import com.facebook.hiveio.output.HiveApiOutputFormat;
 import com.facebook.hiveio.record.HiveWritableRecord;
 
+import java.io.IOException;
+
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
new file mode 100644
index 0000000..7955915
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.output.examples;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.output.SimpleVertexToHive;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+
+/**
+ * VertexToHive that writes Vertexes with integer IDs and integer values
+ */
+public class HiveOutputIntIntVertex extends SimpleVertexToHive<IntWritable,
+    IntWritable, NullWritable> {
+  @Override public void fillRecord(
+      Vertex<IntWritable, IntWritable, NullWritable, ?> vertex,
+      HiveWritableRecord record) {
+    record.set(0, (long) vertex.getId().get());
+    record.set(1, (long) vertex.getValue().get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/package-info.java
new file mode 100644
index 0000000..5195e69
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive output examples.
+ */
+package org.apache.giraph.hive.output.examples;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
new file mode 100644
index 0000000..1103f78
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Helpers {
+  public static Map<Integer, Double> parseIntDoubleResults(Iterable<String> results) {
+    Map<Integer, Double> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      double value = Double.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+
+  public static Map<Integer, Integer> parseIntIntResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+
+  public static void silenceDataNucleusLogger() {
+    Logger logger = Logger.getLogger("org.datanucleus");
+    logger.setLevel(Level.INFO);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
new file mode 100644
index 0000000..2ad87fa
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
+import org.apache.giraph.hive.input.edge.examples.HiveIntDoubleEdge;
+import org.apache.giraph.hive.input.edge.examples.HiveIntNullEdge;
+import org.apache.giraph.hive.vertexes.VertexCountEdges;
+import org.apache.giraph.hive.vertexes.VertexSumEdges;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.testing.LocalHiveServer;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+
+public class HiveEdgeInputTest {
+  private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
+
+  @BeforeClass
+  public static void hushDatanucleusWarnings() {
+    Helpers.silenceDataNucleusLogger();
+  }
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testEdgeInput() throws Exception {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 BIGINT) " +
+        " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'");
+    String[] rows = {
+        "1\t2",
+        "2\t3",
+        "2\t4",
+        "4\t1",
+    };
+    hiveServer.loadData(tableName, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_EDGE_INPUT.setTable(conf, tableName);
+    HIVE_EDGE_INPUT.setClass(conf, HiveIntNullEdge.class);
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Integer> data = Helpers.parseIntIntResults(output);
+    assertEquals(3, data.size());
+    assertEquals(1, (int) data.get(1));
+    assertEquals(2, (int) data.get(2));
+    assertEquals(1, (int) data.get(4));
+  }
+
+  @Test
+  public void testEdgeInputWithPartitions() throws Exception {
+    String tableName = "test1";
+    String partition = "ds='foobar'";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 BIGINT) " +
+        " PARTITIONED BY (ds STRING) " +
+        " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ");
+    String[] rows = {
+        "1\t2",
+        "2\t3",
+        "2\t4",
+        "4\t1",
+    };
+    hiveServer.loadData(tableName, partition, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_EDGE_INPUT.setTable(conf, tableName);
+    HIVE_EDGE_INPUT.setPartition(conf, partition);
+    HIVE_EDGE_INPUT.setClass(conf, HiveIntNullEdge.class);
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Integer> data = Helpers.parseIntIntResults(output);
+    assertEquals(3, data.size());
+    assertEquals(1, (int) data.get(1));
+    assertEquals(2, (int) data.get(2));
+    assertEquals(1, (int) data.get(4));
+  }
+
+  @Test
+  public void testEdgeInputWithValues() throws Exception {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 BIGINT, d3 DOUBLE) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t' " +
+        " COLLECTION ITEMS TERMINATED BY ',' ");
+    String[] rows = {
+        "1\t2\t0.22",
+        "2\t3\t0.33",
+        "2\t4\t0.44",
+        "4\t1\t0.11",
+    };
+    hiveServer.loadData(tableName, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_EDGE_INPUT.setTable(conf, tableName);
+    HIVE_EDGE_INPUT.setClass(conf, HiveIntDoubleEdge.class);
+    conf.setVertexClass(VertexSumEdges.class);
+    conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Double> data = Helpers.parseIntDoubleResults(output);
+    assertEquals(3, data.size());
+    assertEquals(0.22, data.get(1));
+    assertEquals(0.77, data.get(2));
+    assertEquals(0.11, data.get(4));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
new file mode 100644
index 0000000..b6ab139
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntDoubleDoubleVertex;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex;
+import org.apache.giraph.hive.vertexes.VertexCountEdges;
+import org.apache.giraph.hive.vertexes.VertexSumEdges;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.testing.LocalHiveServer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
+
+public class HiveVertexInputTest {
+  private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
+
+  @BeforeClass
+  public static void hushDatanucleusWarnings() {
+    Helpers.silenceDataNucleusLogger();
+  }
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testVertexInput() throws Exception {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t' " +
+        " COLLECTION ITEMS TERMINATED BY ','");
+    String[] rows = {
+        "1\t2",
+        "2\t3,4",
+        "4\t1",
+    };
+    hiveServer.loadData(tableName, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_VERTEX_INPUT.setTable(conf, tableName);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Integer> data = Helpers.parseIntIntResults(output);
+    assertEquals(3, data.size());
+    assertEquals(1, (int) data.get(1));
+    assertEquals(2, (int) data.get(2));
+    assertEquals(1, (int) data.get(4));
+  }
+
+  @Test
+  public void testVertexInputWithPartitions() throws Exception {
+    String tableName = "test1";
+    String partition = "ds='foobar'";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+        " PARTITIONED BY (ds STRING) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t' " +
+        " COLLECTION ITEMS TERMINATED BY ','");
+    String[] rows = {
+        "1\t2",
+        "2\t3,4",
+        "4\t1",
+    };
+    hiveServer.loadData(tableName, partition, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_VERTEX_INPUT.setTable(conf, tableName);
+    HIVE_VERTEX_INPUT.setPartition(conf, partition);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Integer> data = Helpers.parseIntIntResults(output);
+    assertEquals(3, data.size());
+    assertEquals(1, (int) data.get(1));
+    assertEquals(2, (int) data.get(2));
+    assertEquals(1, (int) data.get(4));
+  }
+
+  @Test
+  public void testValues() throws Exception {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, d2 DOUBLE, m3 MAP<BIGINT,DOUBLE>) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t' " +
+        " COLLECTION ITEMS TERMINATED BY ',' " +
+        " MAP KEYS TERMINATED BY ':' ");
+    String[] rows = {
+        "1\t1.11\t2:0.22",
+        "2\t2.22\t3:0.33,4:0.44",
+        "4\t4.44\t1:0.11",
+    };
+    hiveServer.loadData(tableName, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_VERTEX_INPUT.setTable(conf, tableName);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntDoubleDoubleVertex.class);
+    conf.setVertexClass(VertexSumEdges.class);
+    conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> output = InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Map<Integer, Double> data = Helpers.parseIntDoubleResults(output);
+    assertEquals(3, data.size());
+    assertEquals(0.22, data.get(1));
+    assertEquals(0.77, data.get(2));
+    assertEquals(0.11, data.get(4));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/package-info.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/package-info.java
new file mode 100644
index 0000000..c2327ca
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Input test.
+ */
+package org.apache.giraph.hive.input;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
new file mode 100644
index 0000000..5054728
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
+import org.apache.giraph.hive.output.examples.HiveOutputIntIntVertex;
+import org.apache.giraph.hive.vertexes.VertexCountEdges;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.mapred.HackJobContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.input.HiveInput;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.testing.LocalHiveServer;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class HiveOutputTest {
+  private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
+
+  @BeforeClass
+  public static void hushDatanucleusWarnings() {
+    Helpers.silenceDataNucleusLogger();
+  }
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testHiveOutput() throws Exception
+  {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+       " (i1 BIGINT, i2 BIGINT) ");
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    runJob(tableName, conf);
+
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    inputDesc.setTableName(tableName);
+
+    verifyRecords(inputDesc);
+  }
+
+  @Test
+  public void testHiveOutputWithPartitions() throws Exception
+  {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 BIGINT) " +
+        " PARTITIONED BY (ds STRING) ");
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION.set(conf, "ds=foobar");
+
+    runJob(tableName, conf);
+
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    inputDesc.setTableName(tableName);
+    inputDesc.setPartitionFilter("ds='foobar'");
+
+    verifyRecords(inputDesc);
+  }
+
+  private void runJob(String tableName, GiraphConfiguration conf) throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName);
+    GiraphHiveConstants.VERTEX_TO_HIVE_CLASS.set(conf, HiveOutputIntIntVertex.class);
+
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
+    InternalVertexRunner.run(conf, null, edges);
+
+    commitJob(conf);
+  }
+
+  private void commitJob(GiraphConfiguration conf)
+    throws IOException, InterruptedException {
+    ImmutableClassesGiraphConfiguration iconf = new ImmutableClassesGiraphConfiguration(conf);
+    WrappedVertexOutputFormat outputFormat = iconf.createWrappedVertexOutputFormat();
+    TaskAttemptID taskID = new TaskAttemptID();
+    TaskAttemptContext taskContext = new TaskAttemptContext(new JobConf(conf), taskID);
+    OutputCommitter outputCommitter = outputFormat.getOutputCommitter(taskContext);
+    JobConf jobConf = new JobConf(conf);
+    JobContext jobContext = new HackJobContext(jobConf, taskID.getJobID());
+    outputCommitter.commitJob(jobContext);
+  }
+
+  private void verifyRecords(HiveInputDescription inputDesc)
+      throws IOException, InterruptedException
+  {
+    Iterable<HiveReadableRecord> records = HiveInput.readTable(inputDesc);
+    Map<Long, Long> data = Maps.newHashMap();
+
+    // Records are in an unknown sort order so we grab their values here
+    for (HiveReadableRecord record : records) {
+      data.put(record.getLong(0), record.getLong(1));
+    }
+
+    assertEquals(3, data.size());
+    assertEquals(1L, (long) data.get(1L));
+    assertEquals(2L, (long) data.get(2L));
+    assertEquals(1L, (long) data.get(4L));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/package-info.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/package-info.java
new file mode 100644
index 0000000..35d2c63
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive output related tests.
+ */
+package org.apache.giraph.hive.output;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexCountEdges.java b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexCountEdges.java
new file mode 100644
index 0000000..a6db2af
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexCountEdges.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.vertexes;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+public class VertexCountEdges extends Vertex<IntWritable, IntWritable,
+    NullWritable, NullWritable> {
+  @Override public void compute(Iterable<NullWritable> messages)
+      throws IOException {
+    setValue(new IntWritable(getNumEdges()));
+    voteToHalt();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexSumEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexSumEdges.java b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexSumEdges.java
new file mode 100644
index 0000000..d8527b4
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/VertexSumEdges.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.vertexes;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+public class VertexSumEdges extends Vertex<IntWritable, DoubleWritable,
+    DoubleWritable, NullWritable> {
+  @Override public void compute(Iterable<NullWritable> messages)
+      throws IOException {
+    double sum = 0;
+    for (Edge<IntWritable, DoubleWritable> edge : getEdges()) {
+      sum += edge.getValue().get();
+    }
+    setValue(new DoubleWritable(sum));
+    voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/package-info.java b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/package-info.java
new file mode 100644
index 0000000..552bb91
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/vertexes/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Vertex implementations for Hive tests.
+ */
+package org.apache.giraph.hive.vertexes;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4b0f52cc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6d2bdd1..7d7fec9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,13 +249,18 @@ under the License.
   <properties>
     <top.dir>${project.basedir}</top.dir>
     <lib.dir>${top.dir}/lib</lib.dir>
+
     <buildtype>test</buildtype>
+
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+    <dep.hiveio.version>0.9</dep.hiveio.version>
     <hbase.version>0.90.5</hbase.version>
     <codehaus-jackson.version>1.8.0</codehaus-jackson.version>
     <fasterxml-jackson.version>2.1.0</fasterxml-jackson.version>
     <slf4j.version>1.7.2</slf4j.version>
     <hive.version>0.10.0</hive.version>
+
     <facebook-hadoop.version>0.20.0</facebook-hadoop.version>
     <forHadoop>for-hadoop-${hadoop.version}</forHadoop>
   </properties>
@@ -964,7 +969,12 @@ under the License.
       <dependency>
         <groupId>com.facebook.hiveio</groupId>
         <artifactId>hive-io-exp-core</artifactId>
-        <version>0.8</version>
+        <version>${dep.hiveio.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.facebook.hiveio</groupId>
+        <artifactId>hive-io-exp-testing</artifactId>
+        <version>${dep.hiveio.version}</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>
@@ -1050,11 +1060,6 @@ under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.hive</groupId>
-        <artifactId>hive-common</artifactId>
-        <version>${hive.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hive</groupId>
         <artifactId>hive-exec</artifactId>
         <version>${hive.version}</version>
       </dependency>