You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/06/26 07:07:42 UTC
[2/2] git commit: updated refs/heads/trunk to 8fbade6
GIRAPH-693: Giraph-Hive check user code as soon as possible (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8fbade6a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8fbade6a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8fbade6a
Branch: refs/heads/trunk
Commit: 8fbade6a7801afd6065007473585d9d2a761818a
Parents: 8944567
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jun 26 01:06:51 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Jun 26 01:07:16 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
.../edgemarker/AccumuloEdgeInputFormat.java | 2 +
.../apache/giraph/graph/GraphTaskManager.java | 15 +++
.../org/apache/giraph/io/GiraphInputFormat.java | 18 ++-
.../io/formats/GeneratedVertexInputFormat.java | 3 +
.../io/formats/PseudoRandomEdgeInputFormat.java | 3 +
.../PseudoRandomIntNullVertexInputFormat.java | 3 +
.../formats/PseudoRandomVertexInputFormat.java | 3 +
.../formats/SequenceFileVertexInputFormat.java | 3 +
.../giraph/io/formats/TextEdgeInputFormat.java | 3 +
.../io/formats/TextVertexInputFormat.java | 3 +
.../io/formats/TextVertexValueInputFormat.java | 3 +
.../io/formats/multi/MultiEdgeInputFormat.java | 7 ++
.../formats/multi/MultiVertexInputFormat.java | 7 ++
.../io/internal/WrappedEdgeInputFormat.java | 7 ++
.../io/internal/WrappedVertexInputFormat.java | 7 ++
.../giraph/utils/InMemoryVertexInputFormat.java | 3 +
.../giraph/utils/InternalVertexRunner.java | 8 +-
.../examples/AggregatorsTestComputation.java | 2 +
.../hbase/edgemarker/TableEdgeInputFormat.java | 3 +
.../giraph/hive/common/HiveInputOptions.java | 4 +-
.../apache/giraph/hive/common/HiveParsing.java | 4 +-
.../apache/giraph/hive/common/HiveUtils.java | 108 +++++++++++++++--
.../giraph/hive/input/HiveInputChecker.java | 37 ++++++
.../hive/input/edge/HiveEdgeInputFormat.java | 22 +++-
.../giraph/hive/input/edge/HiveEdgeReader.java | 22 +---
.../giraph/hive/input/edge/HiveToEdge.java | 4 +-
.../input/edge/examples/HiveIntDoubleEdge.java | 11 ++
.../input/edge/examples/HiveIntNullEdge.java | 14 +++
.../giraph/hive/input/vertex/HiveToVertex.java | 3 +-
.../input/vertex/HiveVertexInputFormat.java | 25 +++-
.../hive/input/vertex/HiveVertexReader.java | 9 +-
.../examples/HiveIntDoubleDoubleVertex.java | 11 ++
.../vertex/examples/HiveIntNullNullVertex.java | 10 ++
.../hive/output/HiveVertexOutputFormat.java | 29 ++++-
.../giraph/hive/output/HiveVertexWriter.java | 28 +----
.../giraph/hive/output/SimpleVertexToHive.java | 3 +-
.../apache/giraph/hive/output/VertexToHive.java | 20 +++-
.../output/examples/HiveOutputIntIntVertex.java | 10 ++
.../apache/giraph/hive/GiraphHiveTestBase.java | 10 ++
.../java/org/apache/giraph/hive/Helpers.java | 34 +++++-
.../giraph/hive/input/CheckInputTest.java | 115 +++++++++++++++++++
.../giraph/hive/input/HiveEdgeInputTest.java | 14 +--
.../giraph/hive/input/HiveVertexInputTest.java | 14 +--
.../giraph/hive/output/CheckOutputTest.java | 90 +++++++++++++++
.../giraph/hive/output/HiveOutputTest.java | 35 +-----
pom.xml | 2 +-
47 files changed, 656 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2a605d8..9166e2b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-693: Giraph-Hive check user code as soon as possible (nitay)
+
GIRAPH-697: Clean up message stores (majakabiljo)
GIRAPH-696: Should be able to spill giraph metrics to a specified
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index fec4783..ff79d7a 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -40,6 +41,7 @@ import java.util.regex.Pattern;
*/
public class AccumuloEdgeInputFormat
extends AccumuloVertexInputFormat<Text, Text, Text> {
+ @Override public void checkInputSpecs(Configuration conf) { }
private static final Text uselessEdgeValue = new Text();
public VertexReader<Text, Text, Text>
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index b32c21b..f2ad8b6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -161,7 +161,20 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
/**
+ * Run the user's input checking code.
+ */
+ private void checkInput() {
+ if (conf.hasEdgeInputFormat()) {
+ conf.createWrappedEdgeInputFormat().checkInputSpecs(conf);
+ }
+ if (conf.hasVertexInputFormat()) {
+ conf.createWrappedVertexInputFormat().checkInputSpecs(conf);
+ }
+ }
+
+ /**
* Called by owner of this GraphTaskManager on each compute node
+ *
* @param zkPathList the path to the ZK jars we need to run the job
*/
public void setup(Path[] zkPathList)
@@ -179,6 +192,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
setupAndInitializeGiraphMetrics();
// One time setup for computation factory
conf.createComputationFactory().initComputation(conf);
+ // Check input
+ checkInput();
// Do some task setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
locateZookeeperClasspath(zkPathList);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 86e86d8..50f3cb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -18,12 +18,8 @@
package org.apache.giraph.io;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -31,6 +27,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.ReflectionUtils;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
/**
* Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
*
@@ -42,6 +43,13 @@ public abstract class GiraphInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable> extends
DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
+ * Check that input is valid.
+ *
+ * @param conf Configuration
+ */
+ public abstract void checkInputSpecs(Configuration conf);
+
+ /**
* Get the list of input splits for the format.
*
* @param context The job context
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
index 44cfd5c..66137f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +42,8 @@ import java.util.List;
public abstract class GeneratedVertexInputFormat<
I extends WritableComparable, V extends Writable, E extends Writable>
extends VertexInputFormat<I, V, E> {
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 48544b0..e379726 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -42,6 +43,8 @@ import org.apache.log4j.Logger;
*/
public class PseudoRandomEdgeInputFormat
extends EdgeInputFormat<LongWritable, DoubleWritable> {
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
index d8abfdb..b9f91e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.giraph.edge.ReusableEdge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -46,6 +47,8 @@ import java.util.Random;
*/
public class PseudoRandomIntNullVertexInputFormat extends
VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 121c18f..f55c5af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,6 +47,8 @@ import java.util.Set;
*/
public class PseudoRandomVertexInputFormat extends
VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 949fe3b..4d8af1f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -47,6 +48,8 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
new SequenceFileInputFormat<I, X>();
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index db3d5e0..ae688c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -46,6 +47,8 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
/** Underlying GiraphTextInputFormat. */
protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(
JobContext context, int minSplitCountHint) throws IOException,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index 5abd01c..68039a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -49,6 +50,8 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
/** Uses the GiraphTextInputFormat to do everything */
protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index e960444..6a795a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.formats;
import org.apache.giraph.io.VertexValueInputFormat;
import org.apache.giraph.io.VertexValueReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -47,6 +48,8 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
/** Uses the GiraphTextInputFormat to do everything */
protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index fa8839b..a99c651 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.internal.WrappedEdgeReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,6 +47,12 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
/** Edge input formats */
private List<EdgeInputFormat<I, E>> edgeInputFormats;
+ @Override public void checkInputSpecs(Configuration conf) {
+ for (EdgeInputFormat edgeInputFormat : edgeInputFormats) {
+ edgeInputFormat.checkInputSpecs(conf);
+ }
+ }
+
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index e851e38..5480619 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.internal.WrappedVertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -47,6 +48,12 @@ public class MultiVertexInputFormat<I extends WritableComparable,
/** Vertex input formats */
private List<VertexInputFormat<I, V, E>> vertexInputFormats;
+ @Override public void checkInputSpecs(Configuration conf) {
+ for (VertexInputFormat vertexInputFormat : vertexInputFormats) {
+ vertexInputFormat.checkInputSpecs(conf);
+ }
+ }
+
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, V, E> conf) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 9c209dd..c3adf4c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.internal;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -58,6 +59,12 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
}
@Override
+ public void checkInputSpecs(Configuration conf) {
+ getConf().updateConfiguration(conf);
+ originalInputFormat.checkInputSpecs(conf);
+ }
+
+ @Override
public List<InputSplit> getSplits(JobContext context,
int minSplitCountHint) throws IOException, InterruptedException {
getConf().updateConfiguration(context.getConfiguration());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index f5379c1..a58a32d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io.internal;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -60,6 +61,12 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
}
@Override
+ public void checkInputSpecs(Configuration conf) {
+ getConf().updateConfiguration(conf);
+ originalInputFormat.checkInputSpecs(conf);
+ }
+
+ @Override
public List<InputSplit> getSplits(JobContext context,
int minSplitCountHint) throws IOException, InterruptedException {
getConf().updateConfiguration(context.getConfiguration());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
index d14f7a7..777247b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -54,6 +55,8 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
return GRAPH;
}
+ @Override public void checkInputSpecs(Configuration conf) { }
+
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 9fe7663..72fab83 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -73,7 +73,7 @@ public class InternalVertexRunner {
*
* @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
- * @return linewise output data
+ * @return linewise output data, or null if job fails
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
@@ -91,7 +91,7 @@ public class InternalVertexRunner {
* @param conf GiraphClasses specifying which types to use
* @param vertexInputData linewise vertex input data
* @param edgeInputData linewise edge input data
- * @return linewise output data
+ * @return linewise output data, or null if job fails
* @throws Exception if anything goes wrong
*/
public static Iterable<String> run(
@@ -174,7 +174,9 @@ public class InternalVertexRunner {
}
});
try {
- job.run(true);
+ if (!job.run(true)) {
+ return null;
+ }
} finally {
executorService.shutdown();
zookeeper.end();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
index b054e9e..2251b57 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -29,6 +29,7 @@ import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -271,6 +272,7 @@ public class AggregatorsTestComputation extends
*/
public static class SimpleEdgeInputFormat extends
EdgeInputFormat<LongWritable, FloatWritable> {
+ @Override public void checkInputSpecs(Configuration conf) { }
@Override
public EdgeReader<LongWritable, FloatWritable> createEdgeReader(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
index f547172..8589de8 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
@@ -45,6 +46,8 @@ public class TableEdgeInputFormat extends
Logger.getLogger(TableEdgeInputFormat.class);
private static final Text uselessEdgeValue = new Text();
+ @Override public void checkInputSpecs(Configuration conf) { }
+
public VertexReader<Text, Text, Text>
createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
index 5108730..a6993dd 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java
@@ -172,8 +172,8 @@ public class HiveInputOptions<C> {
*/
public HiveInputDescription makeInputDescription(Configuration conf) {
HiveInputDescription inputDescription = new HiveInputDescription();
- inputDescription.setDbName(databaseOpt.get(conf));
- inputDescription.setTableName(tableOpt.get(conf));
+ inputDescription.getTableDesc().setDatabaseName(databaseOpt.get(conf));
+ inputDescription.getTableDesc().setTableName(tableOpt.get(conf));
inputDescription.setPartitionFilter(partitionOpt.get(conf));
inputDescription.setNumSplits(splitsOpt.get(conf));
inputDescription.getMetastoreDesc().setHost(hostOpt.get(conf));
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
index bd28396..56f5119 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java
@@ -45,7 +45,7 @@ public class HiveParsing {
* @return byte
*/
public static byte parseByte(HiveReadableRecord record, int columnIndex) {
- return (byte) record.getLong(columnIndex);
+ return record.getByte(columnIndex);
}
/**
@@ -55,7 +55,7 @@ public class HiveParsing {
* @return int
*/
public static int parseInt(HiveReadableRecord record, int columnIndex) {
- return (int) record.getLong(columnIndex);
+ return record.getInt(columnIndex);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index 2d2fc1e..11b060f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -18,21 +18,35 @@
package org.apache.giraph.hive.common;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.thrift.TException;
import com.facebook.hiveio.input.HiveApiInputFormat;
import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.output.HiveApiOutputFormat;
import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
import com.facebook.hiveio.schema.HiveTableSchemas;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.newInstance;
+
/**
* Utility methods for Hive IO
*/
@@ -54,34 +68,31 @@ public class HiveUtils {
Configuration conf) {
hiveInputFormat.setMyProfileId(profileId);
HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, profileId);
- HiveTableSchemas.put(conf, profileId, inputDescription.hiveTableName());
+ HiveTableSchema schema = HiveTableSchemas.lookup(conf,
+ inputDescription.getTableDesc());
+ HiveTableSchemas.put(conf, profileId, schema);
}
/**
* Initialize hive output, prepare Configuration parameters
*
* @param hiveOutputFormat HiveApiOutputFormat
+ * @param outputDesc HiveOutputDescription
* @param profileId Profile id
- * @param dbName Database name
- * @param tableName Table name
- * @param partition Partition
* @param conf Configuration
*/
public static void initializeHiveOutput(HiveApiOutputFormat hiveOutputFormat,
- String profileId, String dbName, String tableName, String partition,
- Configuration conf) {
+ HiveOutputDescription outputDesc, String profileId, Configuration conf) {
hiveOutputFormat.setMyProfileId(profileId);
- HiveOutputDescription outputDescription = new HiveOutputDescription();
- outputDescription.setDbName(dbName);
- outputDescription.setTableName(tableName);
- outputDescription.setPartitionValues(parsePartitionValues(partition));
try {
- HiveApiOutputFormat.initProfile(conf, outputDescription, profileId);
+ HiveApiOutputFormat.initProfile(conf, outputDesc, profileId);
} catch (TException e) {
throw new IllegalStateException(
"initializeHiveOutput: TException occurred", e);
}
- HiveTableSchemas.put(conf, profileId, outputDescription.hiveTableName());
+ HiveTableSchema schema = HiveTableSchemas.lookup(conf,
+ outputDesc.getTableDesc());
+ HiveTableSchemas.put(conf, profileId, schema);
}
/**
@@ -107,4 +118,77 @@ public class HiveUtils {
}
return partitionValues;
}
+
+ /**
+ * Create a new VertexToHive
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param conf Configuration
+ * @param schema Hive table schema
+ * @return VertexToHive
+ * @throws IOException on any instantiation errors
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable> VertexToHive<I, V, E> newVertexToHive(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ HiveTableSchema schema) throws IOException {
+ Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(conf);
+ if (klass == null) {
+ throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() +
+ " not set in conf");
+ }
+ VertexToHive<I, V, E> vertexToHive = newInstance(klass, conf);
+ HiveTableSchemas.configure(vertexToHive, schema);
+ return vertexToHive;
+ }
+
+ /**
+ * Create a new HiveToEdge
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param conf Configuration
+ * @param schema Hive table schema
+ * @return HiveToVertex
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable> HiveToEdge<I, E> newHiveToEdge(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ HiveTableSchema schema) {
+ Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(conf);
+ if (klass == null) {
+ throw new IllegalArgumentException(
+ HIVE_EDGE_INPUT.getClassOpt().getKey() + " not set in conf");
+ }
+ HiveToEdge hiveToEdge = ReflectionUtils.newInstance(klass, conf);
+ HiveTableSchemas.configure(hiveToEdge, schema);
+ return hiveToEdge;
+ }
+
+ /**
+ * Create a new HiveToVertex
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param conf Configuration
+ * @param schema Hive table schema
+ * @return HiveToVertex
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable> HiveToVertex<I, V, E> newHiveToVertex(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ HiveTableSchema schema) {
+ Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(conf);
+ if (klass == null) {
+ throw new IllegalArgumentException(
+ HIVE_VERTEX_INPUT.getClassOpt().getKey() + " not set in conf");
+ }
+ HiveToVertex hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+ HiveTableSchemas.configure(hiveToVertex, schema);
+ return hiveToVertex;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
new file mode 100644
index 0000000..7572f0c
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input;
+
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+/**
+ * Interface to check the validity of a Hive input configuration.
+ */
+public interface HiveInputChecker {
+ /**
+ * Check the input is valid. This method provides information to the user as
+ * early as possible so that they may validate they are using the correct
+ * input and fail the job early rather than getting into it and waiting a long
+ * time only to find out something was misconfigured.
+ *
+ * @param inputDesc HiveInputDescription
+ * @param schema Hive table schema
+ */
+ void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index a0e9cf3..534a773 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.iterables.EdgeReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,7 +33,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
import java.io.IOException;
import java.util.List;
@@ -56,6 +59,14 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
hiveInputFormat = new HiveApiInputFormat();
}
+ @Override public void checkInputSpecs(Configuration conf) {
+ HiveInputDescription inputDesc =
+ GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf);
+ HiveTableSchema schema = getTableSchema();
+ HiveToEdge<I, E> hiveToEdge = HiveUtils.newHiveToEdge(getConf(), schema);
+ hiveToEdge.checkInput(inputDesc, schema);
+ }
+
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
@@ -79,7 +90,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
throws IOException {
HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
- reader.setTableSchema(hiveInputFormat.getTableSchema(getConf()));
+ reader.setTableSchema(getTableSchema());
RecordReader<WritableComparable, HiveReadableRecord> baseReader;
try {
@@ -91,4 +102,13 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
reader.setHiveRecordReader(baseReader);
return new EdgeReaderWrapper<I, E>(reader);
}
+
+ /**
+ * Get Hive table schema
+ *
+ * @return Hive table schema
+ */
+ private HiveTableSchema getTableSchema() {
+ return hiveInputFormat.getTableSchema(getConf());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index a010307..cc1dcd5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -19,10 +19,10 @@
package org.apache.giraph.hive.input.edge;
import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.hive.input.RecordReaderWrapper;
import org.apache.giraph.io.iterables.EdgeWithSource;
import org.apache.giraph.io.iterables.GiraphReader;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -30,12 +30,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchemas;
import java.io.IOException;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
-
/**
* A reader for reading edges from Hive.
*
@@ -75,26 +72,11 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
hiveRecordReader.initialize(inputSplit, context);
- instantiateHiveToEdgeFromConf();
+ hiveToEdge = HiveUtils.newHiveToEdge(getConf(), getTableSchema());
hiveToEdge.initializeRecords(
new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
}
- /**
- * Retrieve the user's {@link HiveToEdge} from the Configuration.
- *
- * @throws IOException if anything goes wrong reading from Configuration
- */
- private void instantiateHiveToEdgeFromConf() throws IOException {
- Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(getConf());
- if (klass == null) {
- throw new IOException(HIVE_EDGE_INPUT.getClassOpt().getKey() +
- " not set in conf");
- }
- hiveToEdge = ReflectionUtils.newInstance(klass, getConf());
- HiveTableSchemas.configure(hiveToEdge, getTableSchema());
- }
-
@Override
public void close() throws IOException {
hiveRecordReader.close();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
index 61b56d1..1782114 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -18,6 +18,7 @@
package org.apache.giraph.hive.input.edge;
+import org.apache.giraph.hive.input.HiveInputChecker;
import org.apache.giraph.io.iterables.EdgeWithSource;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -37,7 +38,8 @@ import java.util.Iterator;
* @param <E> Edge Value
*/
public interface HiveToEdge<I extends WritableComparable,
- E extends Writable> extends Iterator<EdgeWithSource<I, E>> {
+ E extends Writable> extends Iterator<EdgeWithSource<I, E>>,
+ HiveInputChecker {
/**
* Set the records which contain edge input data
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
index 76cf7e0..9f95da2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java
@@ -22,13 +22,24 @@ import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
/**
* A simple HiveToEdge with integer IDs and double edge values.
*/
public class HiveIntDoubleEdge
extends SimpleHiveToEdge<IntWritable, DoubleWritable> {
+ @Override public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+ Preconditions.checkArgument(schema.columnType(1) == HiveType.INT);
+ Preconditions.checkArgument(schema.columnType(2) == HiveType.DOUBLE);
+ }
+
@Override
public DoubleWritable getEdgeValue(HiveReadableRecord hiveRecord) {
return HiveParsing.parseDoubleWritable(hiveRecord, 2);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
index 3de9680..87e1bd5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java
@@ -21,8 +21,13 @@ import org.apache.giraph.hive.common.HiveParsing;
import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.log4j.Logger;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
/**
* A simple HiveToEdge with integer IDs, no edge value, that assumes the Hive
@@ -30,6 +35,15 @@ import com.facebook.hiveio.record.HiveReadableRecord;
*/
public class HiveIntNullEdge
extends SimpleHiveToEdge<IntWritable, NullWritable> {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(HiveIntNullEdge.class);
+
+ @Override public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+ Preconditions.checkArgument(schema.columnType(1) == HiveType.INT);
+ }
+
@Override public NullWritable getEdgeValue(HiveReadableRecord hiveRecord) {
return NullWritable.get();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
index a9736c6..ad2c244 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -19,6 +19,7 @@
package org.apache.giraph.hive.input.vertex;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.input.HiveInputChecker;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +40,7 @@ import java.util.Iterator;
*/
public interface HiveToVertex<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- Iterator<Vertex<I, V, E>> {
+ Iterator<Vertex<I, V, E>>, HiveInputChecker {
/**
* Set the records which contain vertex input data
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 063c472..d5c1279 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.iterables.VertexReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
import java.io.IOException;
import java.util.List;
+import static org.apache.giraph.hive.common.HiveUtils.newHiveToVertex;
+
/**
* {@link VertexInputFormat} for reading vertices from Hive.
*
@@ -58,6 +63,15 @@ public class HiveVertexInputFormat<I extends WritableComparable,
}
@Override
+ public void checkInputSpecs(Configuration conf) {
+ HiveInputDescription inputDesc =
+ GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf);
+ HiveTableSchema schema = getTableSchema();
+ HiveToVertex<I, V, E> hiveToVertex = newHiveToVertex(getConf(), schema);
+ hiveToVertex.checkInput(inputDesc, schema);
+ }
+
+ @Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, V, E> conf) {
super.setConf(conf);
@@ -78,7 +92,7 @@ public class HiveVertexInputFormat<I extends WritableComparable,
public VertexReader<I, V, E> createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
HiveVertexReader<I, V, E> reader = new HiveVertexReader<I, V, E>();
- reader.setTableSchema(hiveInputFormat.getTableSchema(getConf()));
+ reader.setTableSchema(getTableSchema());
RecordReader<WritableComparable, HiveReadableRecord> baseReader;
try {
@@ -90,4 +104,13 @@ public class HiveVertexInputFormat<I extends WritableComparable,
reader.setHiveRecordReader(baseReader);
return new VertexReaderWrapper<I, V, E>(reader);
}
+
+ /**
+ * Get Hive table schema
+ *
+ * @return Hive table schema
+ */
+ private HiveTableSchema getTableSchema() {
+ return hiveInputFormat.getTableSchema(getConf());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index db61aff..679a3e8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -20,9 +20,9 @@ package org.apache.giraph.hive.input.vertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.hive.input.RecordReaderWrapper;
import org.apache.giraph.io.iterables.GiraphReader;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -30,12 +30,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchemas;
import java.io.IOException;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
-
/**
* VertexReader using Hive
*
@@ -80,9 +77,7 @@ public class HiveVertexReader<I extends WritableComparable,
public void initialize(InputSplit inputSplit,
TaskAttemptContext context) throws IOException, InterruptedException {
hiveRecordReader.initialize(inputSplit, context);
- Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(getConf());
- hiveToVertex = ReflectionUtils.newInstance(klass, getConf());
- HiveTableSchemas.configure(hiveToVertex, getTableSchema());
+ hiveToVertex = HiveUtils.newHiveToVertex(getConf(), getTableSchema());
hiveToVertex.initializeRecords(
new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
index ea2f419..1af01d4 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
@@ -23,7 +23,11 @@ import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
/**
* Simple HiveToVertex that reads vertices with integer IDs, Double vertex
@@ -31,6 +35,13 @@ import com.facebook.hiveio.record.HiveReadableRecord;
*/
public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable,
DoubleWritable, DoubleWritable> {
+ @Override public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+ Preconditions.checkArgument(schema.columnType(1) == HiveType.DOUBLE);
+ Preconditions.checkArgument(schema.columnType(2) == HiveType.MAP);
+ }
+
@Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges(
HiveReadableRecord record) {
return HiveParsing.parseIntDoubleEdges(record, 2);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
index 4e32039..02ad6a1 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
@@ -23,7 +23,11 @@ import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
/**
* Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
@@ -31,6 +35,12 @@ import com.facebook.hiveio.record.HiveReadableRecord;
*/
public class HiveIntNullNullVertex
extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> {
+ @Override public void checkInput(HiveInputDescription inputDesc,
+ HiveTableSchema schema) {
+ Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+ Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST);
+ }
+
@Override
public Iterable<Edge<IntWritable, NullWritable>> getEdges(
HiveReadableRecord record) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index 6968eef..c4813fb 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -29,8 +29,12 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.facebook.hiveio.common.HiveTableDesc;
import com.facebook.hiveio.output.HiveApiOutputFormat;
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.record.HiveRecordFactory;
import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
import java.io.IOException;
@@ -38,6 +42,8 @@ import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTP
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
+import static org.apache.giraph.hive.common.HiveUtils.newVertexToHive;
+import static org.apache.giraph.hive.common.HiveUtils.parsePartitionValues;
/**
* VertexOutputFormat using Hive
@@ -59,16 +65,29 @@ public class HiveVertexOutputFormat<I extends WritableComparable,
hiveOutputFormat = new HiveApiOutputFormat();
}
+ /**
+ * Create HiveOutputDescription from Configuration
+ *
+ * @return HiveOutputDescription
+ */
+ private HiveOutputDescription makeOutputDesc() {
+ HiveOutputDescription outputDesc = new HiveOutputDescription();
+ HiveTableDesc tableDesc = outputDesc.getTableDesc();
+ tableDesc.setDatabaseName(HIVE_VERTEX_OUTPUT_DATABASE.get(getConf()));
+ tableDesc.setTableName(HIVE_VERTEX_OUTPUT_TABLE.get(getConf()));
+ outputDesc.setPartitionValues(
+ parsePartitionValues(HIVE_VERTEX_OUTPUT_PARTITION.get(getConf())));
+ return outputDesc;
+ }
+
@Override
public void setConf(
ImmutableClassesGiraphConfiguration<I, V, E> conf) {
super.setConf(conf);
HiveUtils.initializeHiveOutput(
hiveOutputFormat,
+ makeOutputDesc(),
HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf),
- HIVE_VERTEX_OUTPUT_DATABASE.get(conf),
- HIVE_VERTEX_OUTPUT_TABLE.get(conf),
- HIVE_VERTEX_OUTPUT_PARTITION.get(conf),
conf);
}
@@ -87,6 +106,10 @@ public class HiveVertexOutputFormat<I extends WritableComparable,
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
hiveOutputFormat.checkOutputSpecs(context);
+ HiveTableSchema schema = hiveOutputFormat.getTableSchema(getConf());
+ VertexToHive<I, V, E> vertexToHive = newVertexToHive(getConf(), schema);
+ vertexToHive.checkOutput(makeOutputDesc(), schema,
+ HiveRecordFactory.newWritableRecord(schema));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index 352dee3..bb27f25 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -19,8 +19,8 @@
package org.apache.giraph.hive.output;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.common.HiveUtils;
import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -28,16 +28,12 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
-import com.facebook.hiveio.input.parser.hive.DefaultRecord;
-import com.facebook.hiveio.record.HiveRecord;
+import com.facebook.hiveio.record.HiveRecordFactory;
import com.facebook.hiveio.record.HiveWritableRecord;
import com.facebook.hiveio.schema.HiveTableSchema;
-import com.facebook.hiveio.schema.HiveTableSchemas;
import java.io.IOException;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
-
/**
* Vertex writer using Hive.
*
@@ -55,7 +51,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
/** Schema for table in Hive */
private HiveTableSchema tableSchema;
/** Reusable {@link HiveRecord} */
- private HiveRecord reusableRecord;
+ private HiveWritableRecord reusableRecord;
/** User class to write vertices from a HiveRecord */
private VertexToHive<I, V, E> vertexToHive;
@@ -94,27 +90,13 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
*/
public void setTableSchema(HiveTableSchema tableSchema) {
this.tableSchema = tableSchema;
- reusableRecord = new DefaultRecord(tableSchema.numColumns(),
- new String[0]);
+ reusableRecord = HiveRecordFactory.newWritableRecord(tableSchema);
}
@Override
public void initialize(TaskAttemptContext context)
throws IOException, InterruptedException {
- instantiateVertexToHiveFromConf();
- }
-
- /**
- * Initialize VertexToHive instance from our configuration.
- * @throws IOException errors instantiating
- */
- private void instantiateVertexToHiveFromConf() throws IOException {
- Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(getConf());
- if (klass == null) {
- throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() + " not set in conf");
- }
- vertexToHive = ReflectionUtils.newInstance(klass, getConf());
- HiveTableSchemas.configure(vertexToHive, tableSchema);
+ vertexToHive = HiveUtils.newVertexToHive(getConf(), tableSchema);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
index c3fb6b6..f6de715 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
@@ -22,7 +22,6 @@ import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.hiveio.record.HiveRecord;
import com.facebook.hiveio.record.HiveWritableRecord;
import java.io.IOException;
@@ -51,7 +50,7 @@ public abstract class SimpleVertexToHive<I extends WritableComparable,
@Override
public final void saveVertex(
Vertex<I, V, E> vertex,
- HiveRecord reusableRecord,
+ HiveWritableRecord reusableRecord,
HiveRecordSaver recordSaver) throws IOException, InterruptedException {
fillRecord(vertex, reusableRecord);
recordSaver.save(reusableRecord);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index 28f987e..f9537a7 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -22,7 +22,9 @@ import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.hiveio.record.HiveRecord;
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
import java.io.IOException;
@@ -36,6 +38,20 @@ import java.io.IOException;
public interface VertexToHive<I extends WritableComparable, V extends Writable,
E extends Writable> {
/**
+ * Check the output is valid. This method provides information to the user as
+ * early as possible so that they may validate they are using the correct
+ * input and fail the job early rather than getting into it and waiting a long
+ * time only to find out something was misconfigured.
+ *
+ * @param outputDesc HiveOutputDescription
+ * @param schema Hive table schema
+ * @param emptyRecord Example Hive record that you can write to as if you were
+ * writing a Vertex. This record will check column types.
+ */
+ void checkOutput(HiveOutputDescription outputDesc, HiveTableSchema schema,
+ HiveWritableRecord emptyRecord);
+
+ /**
* Save vertex to the output. One vertex can be stored to multiple rows in
* the output.
*
@@ -50,6 +66,6 @@ public interface VertexToHive<I extends WritableComparable, V extends Writable,
* @param reusableRecord Record to use for writing data to it.
* @param recordSaver Saver of records
*/
- void saveVertex(Vertex<I, V, E> vertex, HiveRecord reusableRecord,
+ void saveVertex(Vertex<I, V, E> vertex, HiveWritableRecord reusableRecord,
HiveRecordSaver recordSaver) throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
index 975d03d..29af71f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/examples/HiveOutputIntIntVertex.java
@@ -22,13 +22,23 @@ import org.apache.giraph.hive.output.SimpleVertexToHive;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.output.HiveOutputDescription;
import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
/**
* VertexToHive that writes Vertexes with integer IDs and integer values
*/
public class HiveOutputIntIntVertex extends SimpleVertexToHive<IntWritable,
IntWritable, NullWritable> {
+ @Override public void checkOutput(HiveOutputDescription outputDesc,
+ HiveTableSchema schema, HiveWritableRecord emptyRecord) {
+ Preconditions.checkArgument(schema.columnType(0) == HiveType.LONG);
+ Preconditions.checkArgument(schema.columnType(1) == HiveType.LONG);
+ }
+
@Override public void fillRecord(
Vertex<IntWritable, IntWritable, NullWritable> vertex,
HiveWritableRecord record) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
new file mode 100644
index 0000000..49a36d0
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
@@ -0,0 +1,10 @@
+package org.apache.giraph.hive;
+
+import org.junit.BeforeClass;
+
+public class GiraphHiveTestBase {
+ @BeforeClass
+ public static void silenceLoggers() {
+ com.facebook.hiveio.testing.Helpers.silenceLoggers();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
index 1103f78..00c00ca 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
@@ -17,8 +17,21 @@
*/
package org.apache.giraph.hive;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.HackJobContext;
+import org.apache.hadoop.mapred.HackTaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
import com.google.common.collect.Maps;
+import java.io.IOException;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -46,8 +59,23 @@ public class Helpers {
return values;
}
- public static void silenceDataNucleusLogger() {
- Logger logger = Logger.getLogger("org.datanucleus");
- logger.setLevel(Level.INFO);
+ public static void commitJob(GiraphConfiguration conf)
+ throws IOException, InterruptedException {
+ ImmutableClassesGiraphConfiguration iconf = new ImmutableClassesGiraphConfiguration(conf);
+ WrappedVertexOutputFormat outputFormat = iconf.createWrappedVertexOutputFormat();
+ JobConf jobConf = new JobConf(conf);
+ TaskAttemptContext
+ taskContext = new HackTaskAttemptContext(jobConf, new TaskAttemptID());
+ OutputCommitter outputCommitter = outputFormat.getOutputCommitter(
+ taskContext);
+ JobContext jobContext = new HackJobContext(jobConf, taskContext.getJobID());
+ outputCommitter.commitJob(jobContext);
+ }
+
+ public static JobContext makeJobContext(Configuration conf) {
+ JobConf jobConf = new JobConf(conf);
+ TaskAttemptContext
+ taskContext = new HackTaskAttemptContext(jobConf, new TaskAttemptID());
+ return new HackJobContext(jobConf, taskContext.getJobID());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
new file mode 100644
index 0000000..43255a7
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/CheckInputTest.java
@@ -0,0 +1,115 @@
+package org.apache.giraph.hive.input;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.computations.ComputationCountEdges;
+import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.edge.examples.HiveIntNullEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.facebook.hiveio.schema.TestSchema;
+import com.facebook.hiveio.testing.LocalHiveServer;
+
+import java.io.IOException;
+
+import static junit.framework.Assert.assertNull;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+
+public class CheckInputTest extends GiraphHiveTestBase {
+ private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
+
+ @Before
+ public void setUp() throws IOException, TException {
+ hiveServer.init();
+ HiveMetastores.setTestClient(hiveServer.getClient());
+ }
+
+ @Test
+ public void testCheckEdge() throws Exception {
+ HiveToEdge hiveToEdge = new HiveIntNullEdge();
+ HiveInputDescription inputDesc = new HiveInputDescription();
+ HiveTableSchema schema = TestSchema.builder()
+ .addColumn("foo", HiveType.INT)
+ .addColumn("bar", HiveType.INT)
+ .build();
+ hiveToEdge.checkInput(inputDesc, schema);
+
+ schema = TestSchema.builder()
+ .addColumn("foo", HiveType.INT)
+ .addColumn("bar", HiveType.LONG)
+ .build();
+ checkEdgeThrows(hiveToEdge, inputDesc, schema);
+ }
+
+ private void checkEdgeThrows(HiveToEdge hiveToEdge,
+ HiveInputDescription inputDesc, HiveTableSchema schema) {
+ try {
+ hiveToEdge.checkInput(inputDesc, schema);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ Assert.fail();
+ }
+
+ @Test
+ public void testCheckVertex() throws Exception {
+ HiveToVertex hiveToVertex = new HiveIntNullNullVertex();
+ HiveInputDescription inputDesc = new HiveInputDescription();
+ HiveTableSchema schema = TestSchema.builder()
+ .addColumn("foo", HiveType.INT)
+ .addColumn("bar", HiveType.LIST)
+ .build();
+ hiveToVertex.checkInput(inputDesc, schema);
+
+ schema = TestSchema.builder()
+ .addColumn("foo", HiveType.INT)
+ .addColumn("bar", HiveType.STRING)
+ .build();
+ checkVertexThrows(hiveToVertex, inputDesc, schema);
+ }
+
+ private void checkVertexThrows(HiveToVertex hiveToVertex,
+ HiveInputDescription inputDesc, HiveTableSchema schema) {
+ try {
+ hiveToVertex.checkInput(inputDesc, schema);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ Assert.fail();
+ }
+
+ @Test
+ public void testCheckJobThrows() throws Exception {
+ String tableName = "test1";
+ hiveServer.createTable("CREATE TABLE " + tableName +
+ " (i1 BIGINT, i2 INT) " +
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'");
+ String[] rows = {
+ "1\t2",
+ "2\t3",
+ "2\t4",
+ "4\t1",
+ };
+ hiveServer.loadData(tableName, rows);
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+ HIVE_EDGE_INPUT.setTable(conf, tableName);
+ HIVE_EDGE_INPUT.setClass(conf, HiveIntNullEdge.class);
+ conf.setComputationClass(ComputationCountEdges.class);
+ conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ assertNull(InternalVertexRunner.run(conf, new String[0], new String[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
index 0bb083c..d5bbb95 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
@@ -18,6 +18,7 @@
package org.apache.giraph.hive.input;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
import org.apache.giraph.hive.Helpers;
import org.apache.giraph.hive.computations.ComputationCountEdges;
import org.apache.giraph.hive.computations.ComputationSumEdges;
@@ -40,14 +41,9 @@ import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
-public class HiveEdgeInputTest {
+public class HiveEdgeInputTest extends GiraphHiveTestBase {
private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
- @BeforeClass
- public static void hushDatanucleusWarnings() {
- Helpers.silenceDataNucleusLogger();
- }
-
@Before
public void setUp() throws IOException, TException {
hiveServer.init();
@@ -58,7 +54,7 @@ public class HiveEdgeInputTest {
public void testEdgeInput() throws Exception {
String tableName = "test1";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, i2 BIGINT) " +
+ " (i1 INT, i2 INT) " +
" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'");
String[] rows = {
"1\t2",
@@ -88,7 +84,7 @@ public class HiveEdgeInputTest {
String tableName = "test1";
String partition = "ds='foobar'";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, i2 BIGINT) " +
+ " (i1 INT, i2 INT) " +
" PARTITIONED BY (ds STRING) " +
" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ");
String[] rows = {
@@ -119,7 +115,7 @@ public class HiveEdgeInputTest {
public void testEdgeInputWithValues() throws Exception {
String tableName = "test1";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, i2 BIGINT, d3 DOUBLE) " +
+ " (i1 INT, i2 INT, d3 DOUBLE) " +
" ROW FORMAT DELIMITED " +
" FIELDS TERMINATED BY '\t' " +
" COLLECTION ITEMS TERMINATED BY ',' ");
http://git-wip-us.apache.org/repos/asf/giraph/blob/8fbade6a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index ec2b7b1..af850d5 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -18,6 +18,7 @@
package org.apache.giraph.hive.input;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.hive.GiraphHiveTestBase;
import org.apache.giraph.hive.Helpers;
import org.apache.giraph.hive.computations.ComputationCountEdges;
import org.apache.giraph.hive.computations.ComputationSumEdges;
@@ -40,14 +41,9 @@ import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
-public class HiveVertexInputTest {
+public class HiveVertexInputTest extends GiraphHiveTestBase {
private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
- @BeforeClass
- public static void hushDatanucleusWarnings() {
- Helpers.silenceDataNucleusLogger();
- }
-
@Before
public void setUp() throws IOException, TException {
hiveServer.init();
@@ -58,7 +54,7 @@ public class HiveVertexInputTest {
public void testVertexInput() throws Exception {
String tableName = "test1";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+ " (i1 INT, i2 ARRAY<BIGINT>) " +
" ROW FORMAT DELIMITED " +
" FIELDS TERMINATED BY '\t' " +
" COLLECTION ITEMS TERMINATED BY ','");
@@ -89,7 +85,7 @@ public class HiveVertexInputTest {
String tableName = "test1";
String partition = "ds='foobar'";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, i2 ARRAY<BIGINT>) " +
+ " (i1 INT, i2 ARRAY<BIGINT>) " +
" PARTITIONED BY (ds STRING) " +
" ROW FORMAT DELIMITED " +
" FIELDS TERMINATED BY '\t' " +
@@ -121,7 +117,7 @@ public class HiveVertexInputTest {
public void testValues() throws Exception {
String tableName = "test1";
hiveServer.createTable("CREATE TABLE " + tableName +
- " (i1 BIGINT, d2 DOUBLE, m3 MAP<BIGINT,DOUBLE>) " +
+ " (i1 INT, d2 DOUBLE, m3 MAP<BIGINT,DOUBLE>) " +
" ROW FORMAT DELIMITED " +
" FIELDS TERMINATED BY '\t' " +
" COLLECTION ITEMS TERMINATED BY ',' " +