You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/06/26 07:07:42 UTC

[2/2] git commit: updated refs/heads/trunk to 8fbade6

GIRAPH-693: Giraph-Hive check user code as soon as possible (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8fbade6a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8fbade6a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8fbade6a

Branch: refs/heads/trunk
Commit: 8fbade6a7801afd6065007473585d9d2a761818a
Parents: 8944567
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jun 26 01:06:51 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Jun 26 01:07:16 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../edgemarker/AccumuloEdgeInputFormat.java     |   2 +
 .../apache/giraph/graph/GraphTaskManager.java   |  15 +++
 .../org/apache/giraph/io/GiraphInputFormat.java |  18 ++-
 .../io/formats/GeneratedVertexInputFormat.java  |   3 +
 .../io/formats/PseudoRandomEdgeInputFormat.java |   3 +
 .../PseudoRandomIntNullVertexInputFormat.java   |   3 +
 .../formats/PseudoRandomVertexInputFormat.java  |   3 +
 .../formats/SequenceFileVertexInputFormat.java  |   3 +
 .../giraph/io/formats/TextEdgeInputFormat.java  |   3 +
 .../io/formats/TextVertexInputFormat.java       |   3 +
 .../io/formats/TextVertexValueInputFormat.java  |   3 +
 .../io/formats/multi/MultiEdgeInputFormat.java  |   7 ++
 .../formats/multi/MultiVertexInputFormat.java   |   7 ++
 .../io/internal/WrappedEdgeInputFormat.java     |   7 ++
 .../io/internal/WrappedVertexInputFormat.java   |   7 ++
 .../giraph/utils/InMemoryVertexInputFormat.java |   3 +
 .../giraph/utils/InternalVertexRunner.java      |   8 +-
 .../examples/AggregatorsTestComputation.java    |   2 +
 .../hbase/edgemarker/TableEdgeInputFormat.java  |   3 +
 .../giraph/hive/common/HiveInputOptions.java    |   4 +-
 .../apache/giraph/hive/common/HiveParsing.java  |   4 +-
 .../apache/giraph/hive/common/HiveUtils.java    | 108 +++++++++++++++--
 .../giraph/hive/input/HiveInputChecker.java     |  37 ++++++
 .../hive/input/edge/HiveEdgeInputFormat.java    |  22 +++-
 .../giraph/hive/input/edge/HiveEdgeReader.java  |  22 +---
 .../giraph/hive/input/edge/HiveToEdge.java      |   4 +-
 .../input/edge/examples/HiveIntDoubleEdge.java  |  11 ++
 .../input/edge/examples/HiveIntNullEdge.java    |  14 +++
 .../giraph/hive/input/vertex/HiveToVertex.java  |   3 +-
 .../input/vertex/HiveVertexInputFormat.java     |  25 +++-
 .../hive/input/vertex/HiveVertexReader.java     |   9 +-
 .../examples/HiveIntDoubleDoubleVertex.java     |  11 ++
 .../vertex/examples/HiveIntNullNullVertex.java  |  10 ++
 .../hive/output/HiveVertexOutputFormat.java     |  29 ++++-
 .../giraph/hive/output/HiveVertexWriter.java    |  28 +----
 .../giraph/hive/output/SimpleVertexToHive.java  |   3 +-
 .../apache/giraph/hive/output/VertexToHive.java |  20 +++-
 .../output/examples/HiveOutputIntIntVertex.java |  10 ++
 .../apache/giraph/hive/GiraphHiveTestBase.java  |  10 ++
 .../java/org/apache/giraph/hive/Helpers.java    |  34 +++++-
 .../giraph/hive/input/CheckInputTest.java       | 115 +++++++++++++++++++
 .../giraph/hive/input/HiveEdgeInputTest.java    |  14 +--
 .../giraph/hive/input/HiveVertexInputTest.java  |  14 +--
 .../giraph/hive/output/CheckOutputTest.java     |  90 +++++++++++++++
 .../giraph/hive/output/HiveOutputTest.java      |  35 +-----
 pom.xml                                         |   2 +-
 47 files changed, 656 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2a605d8..9166e2b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-693: Giraph-Hive check user code as soon as possible (nitay)
+
   GIRAPH-697: Clean up message stores (majakabiljo)
 
   GIRAPH-696: Should be able to spill giraph metrics to a specified 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index fec4783..ff79d7a 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -40,6 +41,7 @@ import java.util.regex.Pattern;
  */
 public class AccumuloEdgeInputFormat
     extends AccumuloVertexInputFormat<Text, Text, Text> {
+  @Override public void checkInputSpecs(Configuration conf) { }
 
   private static final Text uselessEdgeValue = new Text();
   public VertexReader<Text, Text, Text>

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index b32c21b..f2ad8b6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -161,7 +161,20 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Run the user's input checking code.
+   */
+  private void checkInput() {
+    if (conf.hasEdgeInputFormat()) {
+      conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
+    }
+    if (conf.hasVertexInputFormat()) {
+      conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
+    }
+  }
+
+  /**
    * Called by owner of this GraphTaskManager on each compute node
+   *
    * @param zkPathList the path to the ZK jars we need to run the job
    */
   public void setup(Path[] zkPathList)
@@ -179,6 +192,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     setupAndInitializeGiraphMetrics();
     // One time setup for computation factory
     conf.createComputationFactory().initComputation(conf);
+    // Check input
+    checkInput();
     // Do some task setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
     locateZookeeperClasspath(zkPathList);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 86e86d8..50f3cb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -18,12 +18,8 @@
 
 package org.apache.giraph.io;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -31,6 +27,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
  *
@@ -42,6 +43,13 @@ public abstract class GiraphInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
     DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /**
+   * Check that input is valid.
+   *
+   * @param conf Configuration
+   */
+  public abstract void checkInputSpecs(Configuration conf);
+
+  /**
    * Get the list of input splits for the format.
    *
    * @param context The job context

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
index 44cfd5c..66137f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
 
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +42,8 @@ import java.util.List;
 public abstract class GeneratedVertexInputFormat<
     I extends WritableComparable, V extends Writable, E extends Writable>
     extends VertexInputFormat<I, V, E> {
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 48544b0..e379726 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -42,6 +43,8 @@ import org.apache.log4j.Logger;
  */
 public class PseudoRandomEdgeInputFormat
     extends EdgeInputFormat<LongWritable, DoubleWritable> {
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
                                           final int minSplitCountHint)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
index d8abfdb..b9f91e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.giraph.edge.ReusableEdge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -46,6 +47,8 @@ import java.util.Random;
  */
 public class PseudoRandomIntNullVertexInputFormat extends
     VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
       final int minSplitCountHint) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 121c18f..f55c5af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,6 +47,8 @@ import java.util.Set;
  */
 public class PseudoRandomVertexInputFormat extends
     VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public final List<InputSplit> getSplits(final JobContext context,
       final int minSplitCountHint) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 949fe3b..4d8af1f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -47,6 +48,8 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
   protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
     new SequenceFileInputFormat<I, X>();
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index db3d5e0..ae688c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -46,6 +47,8 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
   /** Underlying GiraphTextInputFormat. */
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(
       JobContext context, int minSplitCountHint) throws IOException,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index 5abd01c..68039a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -49,6 +50,8 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
   /** Uses the GiraphTextInputFormat to do everything */
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index e960444..6a795a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
 
 import org.apache.giraph.io.VertexValueInputFormat;
 import org.apache.giraph.io.VertexValueReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -47,6 +48,8 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
   /** Uses the GiraphTextInputFormat to do everything */
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index fa8839b..a99c651 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.internal.WrappedEdgeReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,6 +47,12 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
   /** Edge input formats */
   private List<EdgeInputFormat<I, E>> edgeInputFormats;
 
+  @Override public void checkInputSpecs(Configuration conf) {
+    for (EdgeInputFormat edgeInputFormat : edgeInputFormats) {
+      edgeInputFormat.checkInputSpecs(conf);
+    }
+  }
+
   @Override
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index e851e38..5480619 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.internal.WrappedVertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -47,6 +48,12 @@ public class MultiVertexInputFormat<I extends WritableComparable,
   /** Vertex input formats */
   private List<VertexInputFormat<I, V, E>> vertexInputFormats;
 
+  @Override public void checkInputSpecs(Configuration conf) {
+    for (VertexInputFormat vertexInputFormat : vertexInputFormats) {
+      vertexInputFormat.checkInputSpecs(conf);
+    }
+  }
+
   @Override
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 9c209dd..c3adf4c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.internal;
 
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -58,6 +59,12 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
   }
 
   @Override
+  public void checkInputSpecs(Configuration conf) {
+    getConf().updateConfiguration(conf);
+    originalInputFormat.checkInputSpecs(conf);
+  }
+
+  @Override
   public List<InputSplit> getSplits(JobContext context,
       int minSplitCountHint) throws IOException, InterruptedException {
     getConf().updateConfiguration(context.getConfiguration());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index f5379c1..a58a32d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.internal;
 
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -60,6 +61,12 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
   }
 
   @Override
+  public void checkInputSpecs(Configuration conf) {
+    getConf().updateConfiguration(conf);
+    originalInputFormat.checkInputSpecs(conf);
+  }
+
+  @Override
   public List<InputSplit> getSplits(JobContext context,
       int minSplitCountHint) throws IOException, InterruptedException {
     getConf().updateConfiguration(context.getConfiguration());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
index d14f7a7..777247b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -54,6 +55,8 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
     return GRAPH;
   }
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   @Override
   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
     throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 9fe7663..72fab83 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -73,7 +73,7 @@ public class InternalVertexRunner {
    *
    * @param conf GiraphClasses specifying which types to use
    * @param vertexInputData linewise vertex input data
-   * @return linewise output data
+   * @return linewise output data, or null if job fails
    * @throws Exception if anything goes wrong
    */
   public static Iterable<String> run(
@@ -91,7 +91,7 @@ public class InternalVertexRunner {
    * @param conf GiraphClasses specifying which types to use
    * @param vertexInputData linewise vertex input data
    * @param edgeInputData linewise edge input data
-   * @return linewise output data
+   * @return linewise output data, or null if job fails
    * @throws Exception if anything goes wrong
    */
   public static Iterable<String> run(
@@ -174,7 +174,9 @@ public class InternalVertexRunner {
         }
       });
       try {
-        job.run(true);
+        if (!job.run(true)) {
+          return null;
+        }
       } finally {
         executorService.shutdown();
         zookeeper.end();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
index b054e9e..2251b57 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -29,6 +29,7 @@ import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -271,6 +272,7 @@ public class AggregatorsTestComputation extends
    */
   public static class SimpleEdgeInputFormat extends
     EdgeInputFormat<LongWritable, FloatWritable> {
+    @Override public void checkInputSpecs(Configuration conf) { }
 
     @Override
     public EdgeReader<LongWritable, FloatWritable> createEdgeReader(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
index f547172..8589de8 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
@@ -45,6 +46,8 @@ public class TableEdgeInputFormat extends
       Logger.getLogger(TableEdgeInputFormat.class);
   private static final Text uselessEdgeValue = new Text();
 
+  @Override public void checkInputSpecs(Configuration conf) { }
+
   public VertexReader<Text, Text, Text>
   createVertexReader(InputSplit split,
                      TaskAttemptContext context) throws IOException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
index 5108730..a6993dd 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
@@ -172,8 +172,8 @@ public class HiveInputOptions<C> {
    */
   public HiveInputDescription makeInputDescription(Configuration conf) {
     HiveInputDescription inputDescription = new HiveInputDescription();
-    inputDescription.setDbName(databaseOpt.get(conf));
-    inputDescription.setTableName(tableOpt.get(conf));
+    inputDescription.getTableDesc().setDatabaseName(databaseOpt.get(conf));
+    inputDescription.getTableDesc().setTableName(tableOpt.get(conf));
     inputDescription.setPartitionFilter(partitionOpt.get(conf));
     inputDescription.setNumSplits(splitsOpt.get(conf));
     inputDescription.getMetastoreDesc().setHost(hostOpt.get(conf));

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
index bd28396..56f5119 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
@@ -45,7 +45,7 @@ public class HiveParsing {
    * @return byte
    */
   public static byte parseByte(HiveReadableRecord record, int columnIndex) {
-    return (byte) record.getLong(columnIndex);
+    return record.getByte(columnIndex);
   }
 
   /**
@@ -55,7 +55,7 @@ public class HiveParsing {
    * @return int
    */
   public static int parseInt(HiveReadableRecord record, int columnIndex) {
-    return (int) record.getLong(columnIndex);
+    return record.getInt(columnIndex);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index 2d2fc1e..11b060f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -18,21 +18,35 @@
 
 package org.apache.giraph.hive.common;
 
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.thrift.TException;
 
 import com.facebook.hiveio.input.HiveApiInputFormat;
 import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.output.HiveApiOutputFormat;
 import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
 import com.facebook.hiveio.schema.HiveTableSchemas;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.newInstance;
+
 /**
  * Utility methods for Hive IO
  */
@@ -54,34 +68,31 @@ public class HiveUtils {
       Configuration conf) {
     hiveInputFormat.setMyProfileId(profileId);
     HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, profileId);
-    HiveTableSchemas.put(conf, profileId, inputDescription.hiveTableName());
+    HiveTableSchema schema = HiveTableSchemas.lookup(conf,
+        inputDescription.getTableDesc());
+    HiveTableSchemas.put(conf, profileId, schema);
   }
 
   /**
    * Initialize hive output, prepare Configuration parameters
    *
    * @param hiveOutputFormat HiveApiOutputFormat
+   * @param outputDesc HiveOutputDescription
    * @param profileId Profile id
-   * @param dbName Database name
-   * @param tableName Table name
-   * @param partition Partition
    * @param conf Configuration
    */
   public static void initializeHiveOutput(HiveApiOutputFormat hiveOutputFormat,
-      String profileId, String dbName, String tableName, String partition,
-      Configuration conf) {
+      HiveOutputDescription outputDesc, String profileId, Configuration conf) {
     hiveOutputFormat.setMyProfileId(profileId);
-    HiveOutputDescription outputDescription = new HiveOutputDescription();
-    outputDescription.setDbName(dbName);
-    outputDescription.setTableName(tableName);
-    outputDescription.setPartitionValues(parsePartitionValues(partition));
     try {
-      HiveApiOutputFormat.initProfile(conf, outputDescription, profileId);
+      HiveApiOutputFormat.initProfile(conf, outputDesc, profileId);
     } catch (TException e) {
       throw new IllegalStateException(
           "initializeHiveOutput: TException occurred", e);
     }
-    HiveTableSchemas.put(conf, profileId, outputDescription.hiveTableName());
+    HiveTableSchema schema = HiveTableSchemas.lookup(conf,
+        outputDesc.getTableDesc());
+    HiveTableSchemas.put(conf, profileId, schema);
   }
 
   /**
@@ -107,4 +118,77 @@ public class HiveUtils {
     }
     return partitionValues;
   }
+
+  /**
+   * Create a new VertexToHive
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param conf Configuration
+   * @param schema Hive table schema
+   * @return VertexToHive
+   * @throws IOException on any instantiation errors
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable> VertexToHive<I, V, E> newVertexToHive(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      HiveTableSchema schema) throws IOException {
+    Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(conf);
+    if (klass == null) {
+      throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() +
+          " not set in conf");
+    }
+    VertexToHive<I, V, E> vertexToHive = newInstance(klass, conf);
+    HiveTableSchemas.configure(vertexToHive, schema);
+    return vertexToHive;
+  }
+
+  /**
+   * Create a new HiveToEdge
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param conf Configuration
+   * @param schema Hive table schema
+   * @return HiveToVertex
+   */
+  public static <I extends WritableComparable, V extends Writable,
+        E extends Writable> HiveToEdge<I, E> newHiveToEdge(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      HiveTableSchema schema) {
+    Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(conf);
+    if (klass == null) {
+      throw new IllegalArgumentException(
+          HIVE_EDGE_INPUT.getClassOpt().getKey() + " not set in conf");
+    }
+    HiveToEdge hiveToEdge = ReflectionUtils.newInstance(klass, conf);
+    HiveTableSchemas.configure(hiveToEdge, schema);
+    return hiveToEdge;
+  }
+
+  /**
+   * Create a new HiveToVertex
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param conf Configuration
+   * @param schema Hive table schema
+   * @return HiveToVertex
+   */
+  public static <I extends WritableComparable, V extends Writable,
+        E extends Writable> HiveToVertex<I, V, E> newHiveToVertex(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      HiveTableSchema schema) {
+    Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(conf);
+    if (klass == null) {
+      throw new IllegalArgumentException(
+          HIVE_VERTEX_INPUT.getClassOpt().getKey() + " not set in conf");
+    }
+    HiveToVertex hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+    HiveTableSchemas.configure(hiveToVertex, schema);
+    return hiveToVertex;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
new file mode 100644
index 0000000..7572f0c
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input;
+
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+/**
+ * Interface to check the validity of a Hive input configuration.
+ */
+public interface HiveInputChecker {
+  /**
+   * Check the input is valid. This method provides information to the user as
+   * early as possible so that they may validate they are using the correct
+   * input and fail the job early rather than getting into it and waiting a long
+   * time only to find out something was misconfigured.
+   *
+   * @param inputDesc HiveInputDescription
+   * @param schema Hive table schema
+   */
+  void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index a0e9cf3..534a773 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.iterables.EdgeReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,7 +33,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
 
 import java.io.IOException;
 import java.util.List;
@@ -56,6 +59,14 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     hiveInputFormat = new HiveApiInputFormat();
   }
 
+  @Override public void checkInputSpecs(Configuration conf) {
+    HiveInputDescription inputDesc =
+        GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf);
+    HiveTableSchema schema = getTableSchema();
+    HiveToEdge<I, E> hiveToEdge = HiveUtils.newHiveToEdge(getConf(), schema);
+    hiveToEdge.checkInput(inputDesc, schema);
+  }
+
   @Override
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
@@ -79,7 +90,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     throws IOException {
 
     HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
-    reader.setTableSchema(hiveInputFormat.getTableSchema(getConf()));
+    reader.setTableSchema(getTableSchema());
 
     RecordReader<WritableComparable, HiveReadableRecord> baseReader;
     try {
@@ -91,4 +102,13 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     reader.setHiveRecordReader(baseReader);
     return new EdgeReaderWrapper<I, E>(reader);
   }
+
+  /**
+   * Get Hive table schema
+   *
+   * @return Hive table schema
+   */
+  private HiveTableSchema getTableSchema() {
+    return hiveInputFormat.getTableSchema(getConf());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index a010307..cc1dcd5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -19,10 +19,10 @@
 package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.hive.input.RecordReaderWrapper;
 import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.giraph.io.iterables.GiraphReader;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -30,12 +30,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
-
 /**
  * A reader for reading edges from Hive.
  *
@@ -75,26 +72,11 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
   public void initialize(InputSplit inputSplit, TaskAttemptContext context)
     throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
-    instantiateHiveToEdgeFromConf();
+    hiveToEdge = HiveUtils.newHiveToEdge(getConf(), getTableSchema());
     hiveToEdge.initializeRecords(
         new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
   }
 
-  /**
-   * Retrieve the user's {@link HiveToEdge} from the Configuration.
-   *
-   * @throws IOException if anything goes wrong reading from Configuration
-   */
-  private void instantiateHiveToEdgeFromConf() throws IOException {
-    Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(getConf());
-    if (klass == null) {
-      throw new IOException(HIVE_EDGE_INPUT.getClassOpt().getKey() +
-          " not set in conf");
-    }
-    hiveToEdge = ReflectionUtils.newInstance(klass, getConf());
-    HiveTableSchemas.configure(hiveToEdge, getTableSchema());
-  }
-
   @Override
   public void close() throws IOException {
     hiveRecordReader.close();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
index 61b56d1..1782114 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.hive.input.edge;
 
+import org.apache.giraph.hive.input.HiveInputChecker;
 import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -37,7 +38,8 @@ import java.util.Iterator;
  * @param <E> Edge Value
  */
 public interface HiveToEdge<I extends WritableComparable,
-    E extends Writable> extends Iterator<EdgeWithSource<I, E>> {
+    E extends Writable> extends Iterator<EdgeWithSource<I, E>>,
+    HiveInputChecker {
   /**
    * Set the records which contain edge input data
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
index 76cf7e0..9f95da2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
@@ -22,13 +22,24 @@ import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
 
 /**
  * A simple HiveToEdge with integer IDs and double edge values.
  */
 public class HiveIntDoubleEdge
     extends SimpleHiveToEdge<IntWritable, DoubleWritable> {
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(2) == HiveType.DOUBLE);
+  }
+
   @Override
   public DoubleWritable getEdgeValue(HiveReadableRecord hiveRecord) {
     return HiveParsing.parseDoubleWritable(hiveRecord, 2);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
index 3de9680..87e1bd5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
@@ -21,8 +21,13 @@ import org.apache.giraph.hive.common.HiveParsing;
 import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.log4j.Logger;
 
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
 
 /**
  * A simple HiveToEdge with integer IDs, no edge value, that assumes the Hive
@@ -30,6 +35,15 @@ import com.facebook.hiveio.record.HiveReadableRecord;
  */
 public class HiveIntNullEdge
     extends SimpleHiveToEdge<IntWritable, NullWritable> {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(HiveIntNullEdge.class);
+
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.INT);
+  }
+
   @Override public NullWritable getEdgeValue(HiveReadableRecord hiveRecord) {
     return NullWritable.get();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
index a9736c6..ad2c244 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.hive.input.vertex;
 
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.input.HiveInputChecker;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -39,7 +40,7 @@ import java.util.Iterator;
  */
 public interface HiveToVertex<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    Iterator<Vertex<I, V, E>> {
+    Iterator<Vertex<I, V, E>>, HiveInputChecker {
   /**
    * Set the records which contain vertex input data
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 063c472..d5c1279 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.iterables.VertexReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
 
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.giraph.hive.common.HiveUtils.newHiveToVertex;
+
 /**
  * {@link VertexInputFormat} for reading vertices from Hive.
  *
@@ -58,6 +63,15 @@ public class HiveVertexInputFormat<I extends WritableComparable,
   }
 
   @Override
+  public void checkInputSpecs(Configuration conf) {
+    HiveInputDescription inputDesc =
+        GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf);
+    HiveTableSchema schema = getTableSchema();
+    HiveToVertex<I, V, E> hiveToVertex = newHiveToVertex(getConf(), schema);
+    hiveToVertex.checkInput(inputDesc, schema);
+  }
+
+  @Override
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
@@ -78,7 +92,7 @@ public class HiveVertexInputFormat<I extends WritableComparable,
   public VertexReader<I, V, E> createVertexReader(InputSplit split,
       TaskAttemptContext context) throws IOException {
     HiveVertexReader<I, V, E> reader = new HiveVertexReader<I, V, E>();
-    reader.setTableSchema(hiveInputFormat.getTableSchema(getConf()));
+    reader.setTableSchema(getTableSchema());
 
     RecordReader<WritableComparable, HiveReadableRecord> baseReader;
     try {
@@ -90,4 +104,13 @@ public class HiveVertexInputFormat<I extends WritableComparable,
     reader.setHiveRecordReader(baseReader);
     return new VertexReaderWrapper<I, V, E>(reader);
   }
+
+  /**
+   * Get Hive table schema
+   *
+   * @return Hive table schema
+   */
+  private HiveTableSchema getTableSchema() {
+    return hiveInputFormat.getTableSchema(getConf());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index db61aff..679a3e8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -20,9 +20,9 @@ package org.apache.giraph.hive.input.vertex;
 
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.hive.input.RecordReaderWrapper;
 import org.apache.giraph.io.iterables.GiraphReader;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -30,12 +30,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
-
 /**
  * VertexReader using Hive
  *
@@ -80,9 +77,7 @@ public class HiveVertexReader<I extends WritableComparable,
   public void initialize(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
-    Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(getConf());
-    hiveToVertex = ReflectionUtils.newInstance(klass, getConf());
-    HiveTableSchemas.configure(hiveToVertex, getTableSchema());
+    hiveToVertex = HiveUtils.newHiveToVertex(getConf(), getTableSchema());
     hiveToVertex.initializeRecords(
         new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
index ea2f419..1af01d4 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
@@ -23,7 +23,11 @@ import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
 
 /**
  * Simple HiveToVertex that reads vertices with integer IDs, Double vertex
@@ -31,6 +35,13 @@ import com.facebook.hiveio.record.HiveReadableRecord;
  */
 public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable,
     DoubleWritable, DoubleWritable> {
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.DOUBLE);
+    Preconditions.checkArgument(schema.columnType(2) == HiveType.MAP);
+  }
+
   @Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges(
       HiveReadableRecord record) {
     return HiveParsing.parseIntDoubleEdges(record, 2);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
index 4e32039..02ad6a1 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
@@ -23,7 +23,11 @@ import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
 import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
 
 /**
  * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
@@ -31,6 +35,12 @@ import com.facebook.hiveio.record.HiveReadableRecord;
  */
 public class HiveIntNullNullVertex
     extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> {
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST);
+  }
+
   @Override
   public Iterable<Edge<IntWritable, NullWritable>> getEdges(
       HiveReadableRecord record) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index 6968eef..c4813fb 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -29,8 +29,12 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import com.facebook.hiveio.common.HiveTableDesc;
 import com.facebook.hiveio.output.HiveApiOutputFormat;
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.record.HiveRecordFactory;
 import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
 
 import java.io.IOException;
 
@@ -38,6 +42,8 @@ import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTP
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
+import static org.apache.giraph.hive.common.HiveUtils.newVertexToHive;
+import static org.apache.giraph.hive.common.HiveUtils.parsePartitionValues;
 
 /**
  * VertexOutputFormat using Hive
@@ -59,16 +65,29 @@ public class HiveVertexOutputFormat<I extends WritableComparable,
     hiveOutputFormat = new HiveApiOutputFormat();
   }
 
+  /**
+   * Create HiveOutputDescription from Configuration
+   *
+   * @return HiveOutputDescription
+   */
+  private HiveOutputDescription makeOutputDesc() {
+    HiveOutputDescription outputDesc = new HiveOutputDescription();
+    HiveTableDesc tableDesc = outputDesc.getTableDesc();
+    tableDesc.setDatabaseName(HIVE_VERTEX_OUTPUT_DATABASE.get(getConf()));
+    tableDesc.setTableName(HIVE_VERTEX_OUTPUT_TABLE.get(getConf()));
+    outputDesc.setPartitionValues(
+        parsePartitionValues(HIVE_VERTEX_OUTPUT_PARTITION.get(getConf())));
+    return outputDesc;
+  }
+
   @Override
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
     HiveUtils.initializeHiveOutput(
         hiveOutputFormat,
+        makeOutputDesc(),
         HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf),
-        HIVE_VERTEX_OUTPUT_DATABASE.get(conf),
-        HIVE_VERTEX_OUTPUT_TABLE.get(conf),
-        HIVE_VERTEX_OUTPUT_PARTITION.get(conf),
         conf);
   }
 
@@ -87,6 +106,10 @@ public class HiveVertexOutputFormat<I extends WritableComparable,
   public void checkOutputSpecs(JobContext context)
     throws IOException, InterruptedException {
     hiveOutputFormat.checkOutputSpecs(context);
+    HiveTableSchema schema = hiveOutputFormat.getTableSchema(getConf());
+    VertexToHive<I, V, E> vertexToHive = newVertexToHive(getConf(), schema);
+    vertexToHive.checkOutput(makeOutputDesc(), schema,
+        HiveRecordFactory.newWritableRecord(schema));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index 352dee3..bb27f25 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -19,8 +19,8 @@
 package org.apache.giraph.hive.output;
 
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -28,16 +28,12 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
-import com.facebook.hiveio.input.parser.hive.DefaultRecord;
-import com.facebook.hiveio.record.HiveRecord;
+import com.facebook.hiveio.record.HiveRecordFactory;
 import com.facebook.hiveio.record.HiveWritableRecord;
 import com.facebook.hiveio.schema.HiveTableSchema;
-import com.facebook.hiveio.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
-import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
-
 /**
  * Vertex writer using Hive.
  *
@@ -55,7 +51,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
   /** Reusable {@link HiveRecord} */
-  private HiveRecord reusableRecord;
+  private HiveWritableRecord reusableRecord;
   /** User class to write vertices from a HiveRecord */
   private VertexToHive<I, V, E> vertexToHive;
 
@@ -94,27 +90,13 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
    */
   public void setTableSchema(HiveTableSchema tableSchema) {
     this.tableSchema = tableSchema;
-    reusableRecord = new DefaultRecord(tableSchema.numColumns(),
-        new String[0]);
+    reusableRecord = HiveRecordFactory.newWritableRecord(tableSchema);
   }
 
   @Override
   public void initialize(TaskAttemptContext context)
     throws IOException, InterruptedException {
-    instantiateVertexToHiveFromConf();
-  }
-
-  /**
-   * Initialize VertexToHive instance from our configuration.
-   * @throws IOException errors instantiating
-   */
-  private void instantiateVertexToHiveFromConf() throws IOException {
-    Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(getConf());
-    if (klass == null) {
-      throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() + " not set in conf");
-    }
-    vertexToHive = ReflectionUtils.newInstance(klass, getConf());
-    HiveTableSchemas.configure(vertexToHive, tableSchema);
+    vertexToHive = HiveUtils.newVertexToHive(getConf(), tableSchema);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
index c3fb6b6..f6de715 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
@@ -22,7 +22,6 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.hiveio.record.HiveRecord;
 import com.facebook.hiveio.record.HiveWritableRecord;
 
 import java.io.IOException;
@@ -51,7 +50,7 @@ public abstract class SimpleVertexToHive<I extends WritableComparable,
   @Override
   public final void saveVertex(
       Vertex<I, V, E> vertex,
-      HiveRecord reusableRecord,
+      HiveWritableRecord reusableRecord,
       HiveRecordSaver recordSaver) throws IOException, InterruptedException {
     fillRecord(vertex, reusableRecord);
     recordSaver.save(reusableRecord);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index 28f987e..f9537a7 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -22,7 +22,9 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.hiveio.record.HiveRecord;
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
 
 import java.io.IOException;
 
@@ -36,6 +38,20 @@ import java.io.IOException;
 public interface VertexToHive<I extends WritableComparable, V extends Writable,
     E extends Writable> {
   /**
+   * Check the output is valid. This method provides information to the user as
+   * early as possible so that they may validate they are using the correct
+   * input and fail the job early rather than getting into it and waiting a long
+   * time only to find out something was misconfigured.
+   *
+   * @param outputDesc HiveOutputDescription
+   * @param schema Hive table schema
+   * @param emptyRecord Example Hive record that you can write to as if you were
+   *                    writing a Vertex. This record will check column types.
+   */
+  void checkOutput(HiveOutputDescription outputDesc, HiveTableSchema schema,
+      HiveWritableRecord emptyRecord);
+
+  /**
    * Save vertex to the output. One vertex can be stored to multiple rows in
    * the output.
    *
@@ -50,6 +66,6 @@ public interface VertexToHive<I extends WritableComparable, V extends Writable,
    * @param reusableRecord Record to use for writing data to it.
    * @param recordSaver Saver of records
    */
-  void saveVertex(Vertex<I, V, E> vertex, HiveRecord reusableRecord,
+  void saveVertex(Vertex<I, V, E> vertex, HiveWritableRecord reusableRecord,
       HiveRecordSaver recordSaver) throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
index 975d03d..29af71f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
@@ -22,13 +22,23 @@ import org.apache.giraph.hive.output.SimpleVertexToHive;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.output.HiveOutputDescription;
 import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
 
 /**
  * VertexToHive that writes Vertexes with integer IDs and integer values
  */
 public class HiveOutputIntIntVertex extends SimpleVertexToHive<IntWritable,
     IntWritable, NullWritable> {
+  @Override public void checkOutput(HiveOutputDescription outputDesc,
+      HiveTableSchema schema, HiveWritableRecord emptyRecord) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.LONG);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.LONG);
+  }
+
   @Override public void fillRecord(
       Vertex<IntWritable, IntWritable, NullWritable> vertex,
       HiveWritableRecord record) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
new file mode 100644
index 0000000..49a36d0
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
@@ -0,0 +1,10 @@
+package org.apache.giraph.hive;
+
+import org.junit.BeforeClass;
+
+public class GiraphHiveTestBase {
+  @BeforeClass
+  public static void silenceLoggers() {
+    com.facebook.hiveio.testing.Helpers.silenceLoggers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
index 1103f78..00c00ca 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
@@ -17,8 +17,21 @@
  */
 package org.apache.giraph.hive;
 
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.HackJobContext;
+import org.apache.hadoop.mapred.HackTaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
 import com.google.common.collect.Maps;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -46,8 +59,23 @@ public class Helpers {
     return values;
   }
 
-  public static void silenceDataNucleusLogger() {
-    Logger logger = Logger.getLogger("org.datanucleus");
-    logger.setLevel(Level.INFO);
+  public static void commitJob(GiraphConfiguration conf)
+    throws IOException, InterruptedException {
+    ImmutableClassesGiraphConfiguration iconf = new ImmutableClassesGiraphConfiguration(conf);
+    WrappedVertexOutputFormat outputFormat = iconf.createWrappedVertexOutputFormat();
+    JobConf jobConf = new JobConf(conf);
+    TaskAttemptContext
+        taskContext = new HackTaskAttemptContext(jobConf, new TaskAttemptID());
+    OutputCommitter outputCommitter = outputFormat.getOutputCommitter(
+        taskContext);
+    JobContext jobContext = new HackJobContext(jobConf, taskContext.getJobID());
+    outputCommitter.commitJob(jobContext);
+  }
+
+  public static JobContext makeJobContext(Configuration conf) {
+    JobConf jobConf = new JobConf(conf);
+    TaskAttemptContext
+        taskContext = new HackTaskAttemptContext(jobConf, new TaskAttemptID());
+    return new HackJobContext(jobConf, taskContext.getJobID());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
new file mode 100644
index 0000000..43255a7
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
@@ -0,0 +1,115 @@
+package org.apache.giraph.hive.input;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.computations.ComputationCountEdges;
+import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.edge.examples.HiveIntNullEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.facebook.hiveio.schema.TestSchema;
+import com.facebook.hiveio.testing.LocalHiveServer;
+
+import java.io.IOException;
+
+import static junit.framework.Assert.assertNull;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+
+public class CheckInputTest extends GiraphHiveTestBase {
+  private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testCheckEdge() throws Exception {
+    HiveToEdge hiveToEdge = new HiveIntNullEdge();
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    HiveTableSchema schema = TestSchema.builder()
+        .addColumn("foo", HiveType.INT)
+        .addColumn("bar", HiveType.INT)
+        .build();
+    hiveToEdge.checkInput(inputDesc, schema);
+
+    schema = TestSchema.builder()
+            .addColumn("foo", HiveType.INT)
+            .addColumn("bar", HiveType.LONG)
+            .build();
+    checkEdgeThrows(hiveToEdge, inputDesc, schema);
+  }
+
+  private void checkEdgeThrows(HiveToEdge hiveToEdge,
+      HiveInputDescription inputDesc, HiveTableSchema schema) {
+    try {
+      hiveToEdge.checkInput(inputDesc, schema);
+    } catch (IllegalArgumentException e) {
+      return;
+    }
+    Assert.fail();
+  }
+
+  @Test
+  public void testCheckVertex() throws Exception {
+    HiveToVertex hiveToVertex = new HiveIntNullNullVertex();
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    HiveTableSchema schema = TestSchema.builder()
+        .addColumn("foo", HiveType.INT)
+        .addColumn("bar", HiveType.LIST)
+        .build();
+    hiveToVertex.checkInput(inputDesc, schema);
+
+    schema = TestSchema.builder()
+            .addColumn("foo", HiveType.INT)
+            .addColumn("bar", HiveType.STRING)
+            .build();
+    checkVertexThrows(hiveToVertex, inputDesc, schema);
+  }
+
+  private void checkVertexThrows(HiveToVertex hiveToVertex,
+      HiveInputDescription inputDesc, HiveTableSchema schema) {
+    try {
+      hiveToVertex.checkInput(inputDesc, schema);
+    } catch (IllegalArgumentException e) {
+      return;
+    }
+    Assert.fail();
+  }
+
+  @Test
+  public void testCheckJobThrows() throws Exception {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 INT) " +
+        " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'");
+    String[] rows = {
+        "1\t2",
+        "2\t3",
+        "2\t4",
+        "4\t1",
+    };
+    hiveServer.loadData(tableName, rows);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    HIVE_EDGE_INPUT.setTable(conf, tableName);
+    HIVE_EDGE_INPUT.setClass(conf, HiveIntNullEdge.class);
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    assertNull(InternalVertexRunner.run(conf, new String[0], new String[0]));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
index 0bb083c..d5bbb95 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.hive.input;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
 import org.apache.giraph.hive.Helpers;
 import org.apache.giraph.hive.computations.ComputationCountEdges;
 import org.apache.giraph.hive.computations.ComputationSumEdges;
@@ -40,14 +41,9 @@ import java.util.Map;
 import static junit.framework.Assert.assertEquals;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
 
-public class HiveEdgeInputTest {
+public class HiveEdgeInputTest extends GiraphHiveTestBase {
   private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
 
-  @BeforeClass
-  public static void hushDatanucleusWarnings() {
-    Helpers.silenceDataNucleusLogger();
-  }
-
   @Before
   public void setUp() throws IOException, TException {
     hiveServer.init();
@@ -58,7 +54,7 @@ public class HiveEdgeInputTest {
   public void testEdgeInput() throws Exception {
     String tableName = "test1";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, i2 BIGINT) " +
+        " (i1 INT, i2 INT) " +
         " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'");
     String[] rows = {
         "1\t2",
@@ -88,7 +84,7 @@ public class HiveEdgeInputTest {
     String tableName = "test1";
     String partition = "ds='foobar'";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, i2 BIGINT) " +
+        " (i1 INT, i2 INT) " +
         " PARTITIONED BY (ds STRING) " +
         " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ");
     String[] rows = {
@@ -119,7 +115,7 @@ public class HiveEdgeInputTest {
   public void testEdgeInputWithValues() throws Exception {
     String tableName = "test1";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, i2 BIGINT, d3 DOUBLE) " +
+        " (i1 INT, i2 INT, d3 DOUBLE) " +
         " ROW FORMAT DELIMITED " +
         " FIELDS TERMINATED BY '\t' " +
         " COLLECTION ITEMS TERMINATED BY ',' ");

http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index ec2b7b1..af850d5 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.hive.input;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
 import org.apache.giraph.hive.Helpers;
 import org.apache.giraph.hive.computations.ComputationCountEdges;
 import org.apache.giraph.hive.computations.ComputationSumEdges;
@@ -40,14 +41,9 @@ import java.util.Map;
 import static junit.framework.Assert.assertEquals;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 
-public class HiveVertexInputTest {
+public class HiveVertexInputTest extends GiraphHiveTestBase {
   private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
 
-  @BeforeClass
-  public static void hushDatanucleusWarnings() {
-    Helpers.silenceDataNucleusLogger();
-  }
-
   @Before
   public void setUp() throws IOException, TException {
     hiveServer.init();
@@ -58,7 +54,7 @@ public class HiveVertexInputTest {
   public void testVertexInput() throws Exception {
     String tableName = "test1";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+        " (i1 INT, i2 ARRAY<BIGINT>) " +
         " ROW FORMAT DELIMITED " +
         " FIELDS TERMINATED BY '\t' " +
         " COLLECTION ITEMS TERMINATED BY ','");
@@ -89,7 +85,7 @@ public class HiveVertexInputTest {
     String tableName = "test1";
     String partition = "ds='foobar'";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+        " (i1 INT, i2 ARRAY<BIGINT>) " +
         " PARTITIONED BY (ds STRING) " +
         " ROW FORMAT DELIMITED " +
         " FIELDS TERMINATED BY '\t' " +
@@ -121,7 +117,7 @@ public class HiveVertexInputTest {
   public void testValues() throws Exception {
     String tableName = "test1";
     hiveServer.createTable("CREATE TABLE " + tableName +
-        " (i1 BIGINT, d2 DOUBLE, m3 MAP<BIGINT,DOUBLE>) " +
+        " (i1 INT, d2 DOUBLE, m3 MAP<BIGINT,DOUBLE>) " +
         " ROW FORMAT DELIMITED " +
         " FIELDS TERMINATED BY '\t' " +
         " COLLECTION ITEMS TERMINATED BY ',' " +