You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/28 01:27:07 UTC
git commit: GIRAPH-588: GIRAPH-588: More flexible Hive input
(majakabiljo)
Updated Branches:
refs/heads/trunk 01c527e22 -> 95e122676
GIRAPH-588: GIRAPH-588: More flexible Hive input (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/95e12267
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/95e12267
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/95e12267
Branch: refs/heads/trunk
Commit: 95e1226766fb7df16e699cd54cf8ba0d043dc69d
Parents: 01c527e
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Mar 27 17:09:02 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Mar 27 17:26:24 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/io/iterables/EdgeReaderWrapper.java | 83 ++++++++++++
.../apache/giraph/io/iterables/EdgeWithSource.java | 96 ++++++++++++++
.../apache/giraph/io/iterables/GiraphReader.java | 59 +++++++++
.../io/iterables/IteratorToReaderWrapper.java | 68 ++++++++++
.../giraph/io/iterables/VertexReaderWrapper.java | 82 ++++++++++++
.../apache/giraph/io/iterables/package-info.java | 21 +++
.../org/apache/giraph/hive/HiveGiraphRunner.java | 76 +++---------
.../DefaultConfigurableAndTableSchemaAware.java | 52 ++++++++
.../giraph/hive/input/RecordReaderWrapper.java | 61 +++++++++
.../giraph/hive/input/edge/AbstractHiveToEdge.java | 13 +-
.../hive/input/edge/HiveEdgeInputFormat.java | 3 +-
.../giraph/hive/input/edge/HiveEdgeReader.java | 61 +++------
.../apache/giraph/hive/input/edge/HiveToEdge.java | 35 ++---
.../giraph/hive/input/edge/SimpleHiveToEdge.java | 89 +++++++++++++
.../hive/input/vertex/AbstractHiveToVertex.java | 41 ++++++
.../input/vertex/AbstractHiveToVertexEdges.java | 49 -------
.../input/vertex/AbstractHiveToVertexValue.java | 49 -------
.../giraph/hive/input/vertex/HiveToVertex.java | 50 +++++++
.../hive/input/vertex/HiveToVertexEdges.java | 68 ----------
.../hive/input/vertex/HiveToVertexValue.java | 49 -------
.../hive/input/vertex/HiveVertexInputFormat.java | 5 +-
.../giraph/hive/input/vertex/HiveVertexReader.java | 102 ++++----------
.../hive/input/vertex/SimpleHiveToVertex.java | 93 +++++++++++++
.../input/vertex/SimpleNoEdgesHiveToVertex.java | 44 ++++++
.../giraph/hive/output/AbstractVertexToHive.java | 20 +---
26 files changed, 934 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 051c9a7..ff20214 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-588: More flexible Hive input (majakabiljo)
+
GIRAPH-587: Refactor configuration options (nitay)
GIRAPH-581: More flexible Hive output (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
new file mode 100644
index 0000000..4af221a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for edges into {@link EdgeReader}
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeReaderWrapper<I extends WritableComparable,
+ E extends Writable> implements EdgeReader<I, E> {
+ /** Wrapped edge reader */
+ private GiraphReader<EdgeWithSource<I, E>> edgeReader;
+ /** {@link EdgeReader}-like wrapper of {@link #edgeReader} */
+ private IteratorToReaderWrapper<EdgeWithSource<I, E>> iterator;
+
+ /**
+ * Constructor
+ *
+ * @param edgeReader GiraphReader for edges to wrap
+ */
+ public EdgeReaderWrapper(GiraphReader<EdgeWithSource<I, E>> edgeReader) {
+ this.edgeReader = edgeReader;
+ iterator = new IteratorToReaderWrapper<EdgeWithSource<I, E>>(edgeReader);
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ return iterator.nextObject();
+ }
+
+ @Override
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ return iterator.getCurrentObject().getSourceVertexId();
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+ return iterator.getCurrentObject().getEdge();
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ edgeReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ edgeReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return edgeReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
new file mode 100644
index 0000000..6f72de3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeWithSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Wrapper for edge and its source id
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeWithSource<I extends WritableComparable,
+ E extends Writable> {
+ /** Source id */
+ private I sourceVertexId;
+ /** Edge */
+ private ReusableEdge<I, E> edge;
+
+ /**
+ * Constructor
+ */
+ public EdgeWithSource() {
+ }
+
+ /**
+ * Constructor with source id and edge
+ *
+ * @param sourceVertexId Source id
+ * @param edge Edge
+ */
+ public EdgeWithSource(I sourceVertexId, ReusableEdge<I, E> edge) {
+ this.sourceVertexId = sourceVertexId;
+ this.edge = edge;
+ }
+
+ public I getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(I sourceVertexId) {
+ this.sourceVertexId = sourceVertexId;
+ }
+
+ public ReusableEdge<I, E> getEdge() {
+ return edge;
+ }
+
+ public void setEdge(ReusableEdge<I, E> edge) {
+ this.edge = edge;
+ }
+
+ public I getTargetVertexId() {
+ return edge.getTargetVertexId();
+ }
+
+ /**
+ * Set target vertex id of this edge
+ *
+ * @param targetVertexId Target vertex id
+ */
+ public void setTargetVertexId(I targetVertexId) {
+ edge.setTargetVertexId(targetVertexId);
+ }
+
+ public E getEdgeValue() {
+ return edge.getValue();
+ }
+
+ /**
+ * Set the value of this edge
+ *
+ * @param value Edge value
+ */
+ public void setEdgeValue(E value) {
+ edge.setValue(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
new file mode 100644
index 0000000..736ee9f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/GiraphReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Reader for some kind of data.
+ *
+ * @param <T> Type of data which we are reading (can be vertex, edges, etc)
+ */
+public interface GiraphReader<T> extends Iterator<T> {
+ /**
+ * Use the input split and context to setup reading.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit Input split to be used for reading.
+ * @param context Context from the task.
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+ IOException, InterruptedException;
+
+ /**
+ * Close this {@link GiraphReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link GiraphReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
new file mode 100644
index 0000000..409bb99
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/IteratorToReaderWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import java.util.Iterator;
+
+/**
+ * Wraps {@link Iterator} into object which provides iteration like in
+ * {@link org.apache.giraph.io.VertexReader} or
+ * {@link org.apache.giraph.io.EdgeReader}
+ *
+ * @param <T>
+ */
+public class IteratorToReaderWrapper<T> {
+ /** Wrapped iterator */
+ private Iterator<T> iterator;
+ /** Current object */
+ private T currentObject = null;
+
+ /**
+ * Constructor
+ *
+ * @param iterator Iterator to wrap
+ */
+ public IteratorToReaderWrapper(Iterator<T> iterator) {
+ this.iterator = iterator;
+ }
+
+ /**
+ * Read next object
+ *
+ * @return False iff there are no more objects
+ */
+ public boolean nextObject() {
+ boolean hasNext = iterator.hasNext();
+ if (hasNext) {
+ currentObject = iterator.next();
+ } else {
+ currentObject = null;
+ }
+ return hasNext;
+ }
+
+ /**
+ * Get the current object
+ *
+ * @return Current object
+ */
+ public T getCurrentObject() {
+ return currentObject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
new file mode 100644
index 0000000..7493942
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for vertices into {@link VertexReader}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class VertexReaderWrapper<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> implements
+ VertexReader<I, V, E, M> {
+ /** Wrapped edge reader */
+ private GiraphReader<Vertex<I, V, E, M>> vertexReader;
+ /** {@link VertexReader}-like wrapper of {@link #vertexReader} */
+ private IteratorToReaderWrapper<Vertex<I, V, E, M>> iterator;
+
+ /**
+ * Constructor
+ *
+ * @param vertexReader GiraphReader for vertices to wrap
+ */
+ public VertexReaderWrapper(GiraphReader<Vertex<I, V, E, M>> vertexReader) {
+ this.vertexReader = vertexReader;
+ iterator = new IteratorToReaderWrapper<Vertex<I, V, E, M>>(vertexReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return iterator.nextObject();
+ }
+
+ @Override
+ public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ return iterator.getCurrentObject();
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ vertexReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ vertexReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return vertexReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
new file mode 100644
index 0000000..187cc4e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Iterable wrappers for IO readers/writers
+ */
+package org.apache.giraph.io.iterables;
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index efc08d3..0039dd6 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -30,8 +30,7 @@ import org.apache.giraph.hive.common.HiveProfiles;
import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
import org.apache.giraph.hive.input.edge.HiveEdgeReader;
import org.apache.giraph.hive.input.edge.HiveToEdge;
-import org.apache.giraph.hive.input.vertex.HiveToVertexValue;
-import org.apache.giraph.hive.input.vertex.HiveToVertexEdges;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
import org.apache.giraph.hive.input.vertex.HiveVertexReader;
import org.apache.giraph.hive.output.HiveVertexOutputFormat;
@@ -78,9 +77,7 @@ public class HiveGiraphRunner implements Tool {
private Class<? extends Vertex> vertexClass;
/** Vertex creator from hive records. */
- private Class<? extends HiveToVertexValue> hiveToVertexClass;
- /** Vertex edges creator from hive records. */
- private Class<? extends HiveToVertexEdges> hiveToVertexEdgesClass;
+ private Class<? extends HiveToVertex> hiveToVertexClass;
/** hive vertex input information */
private final HiveInputDescription hiveVertexInputDescription;
@@ -127,56 +124,31 @@ public class HiveGiraphRunner implements Tool {
return hiveEdgeInputDescription;
}
- public Class<? extends HiveToVertexValue> getHiveToVertexValueClass() {
+ public Class<? extends HiveToVertex> getHiveToVertexClass() {
return hiveToVertexClass;
}
/**
- * Set HiveVertexCreator used with HiveVertexInputFormat
+ * Set HiveToVertex used with HiveVertexInputFormat
*
- * @param hiveToVertexClass HiveVertexCreator
+ * @param hiveToVertexClass HiveToVertex
*/
- public void setHiveToVertexValueClass(
- Class<? extends HiveToVertexValue> hiveToVertexClass) {
+ public void setHiveToVertexClass(
+ Class<? extends HiveToVertex> hiveToVertexClass) {
this.hiveToVertexClass = hiveToVertexClass;
conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass,
- HiveToVertexValue.class);
+ HiveToVertex.class);
}
/**
* Whether to use vertex input.
*
- * @return true if vertex input enabled (HiveVertexCreator is set).
+ * @return true if vertex input enabled (HiveToVertex is set).
*/
public boolean hasVertexValueInput() {
return hiveToVertexClass != null;
}
- public Class<? extends HiveToVertexEdges> getHiveToVertexEdgesClass() {
- return hiveToVertexEdgesClass;
- }
-
- /**
- * Whether we have a class for reading per-vertex edges from Hive.
- *
- * @return true if user set class for reading vertex edges.
- */
- public boolean hasHiveToVertexEdgesClass() {
- return hiveToVertexEdgesClass != null;
- }
-
- /**
- * Set class for reading per-vertex edges from hive.
- *
- * @param klass Class to use
- */
- public void setHiveToVertexEdgesClass(
- Class<? extends HiveToVertexEdges> klass) {
- this.hiveToVertexEdgesClass = klass;
- conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_EDGES_KEY, klass,
- HiveToVertexEdges.class);
- }
-
public Class<? extends HiveToEdge> getHiveToEdgeClass() {
return hiveToEdgeClass;
}
@@ -184,16 +156,16 @@ public class HiveGiraphRunner implements Tool {
/**
* Whether to use edge input.
*
- * @return true if edge input enabled (HiveEdgeCreator is set).
+ * @return true if edge input enabled (HiveToEdge is set).
*/
public boolean hasEdgeInput() {
return hiveToEdgeClass != null;
}
/**
- * Set HiveEdgeCreator used with HiveEdgeInputFormat
+ * Set HiveToEdge used with HiveEdgeInputFormat
*
- * @param hiveToEdgeClass HiveEdgeCreator
+ * @param hiveToEdgeClass HiveToEdge
*/
public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
this.hiveToEdgeClass = hiveToEdgeClass;
@@ -364,15 +336,8 @@ public class HiveGiraphRunner implements Tool {
String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
if (hiveToVertexClassStr != null) {
- setHiveToVertexValueClass(findClass(hiveToVertexClassStr,
- HiveToVertexValue.class));
- }
-
- String hiveToVertexEdgesClassStr =
- cmdln.getOptionValue("hiveToVertexEdgesClass");
- if (hiveToVertexEdgesClassStr != null) {
- setHiveToVertexEdgesClass(findClass(hiveToVertexEdgesClassStr,
- HiveToVertexEdges.class));
+ setHiveToVertexClass(findClass(hiveToVertexClassStr,
+ HiveToVertex.class));
}
String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
@@ -392,7 +357,7 @@ public class HiveGiraphRunner implements Tool {
if (hiveToVertexClass == null && hiveToEdgeClass == null) {
throw new IllegalArgumentException(
"Need at least one of Giraph " +
- HiveToVertexValue.class.getSimpleName() +
+ HiveToVertex.class.getSimpleName() +
" class name (-hiveToVertexClass) and " +
HiveToEdge.class.getSimpleName() +
" class name (-hiveToEdgeClass)");
@@ -522,14 +487,9 @@ public class HiveGiraphRunner implements Tool {
// Vertex input settings
if (hiveToVertexClass == null) {
options.addOption(null, "hiveToVertexClass", true,
- "Giraph " + HiveToVertexValue.class.getSimpleName() +
+ "Giraph " + HiveToVertex.class.getSimpleName() +
" class to use");
}
- if (hiveToVertexEdgesClass == null) {
- options.addOption(null, "hiveToVertexEdgesClass", true,
- "Giraph " + HiveToVertexEdges.class.getSimpleName() +
- " class to use");
- }
options.addOption("vi", "vertexInputTable", true,
"Vertex input table name");
options.addOption("VI", "vertexInputFilter", true,
@@ -654,10 +614,6 @@ public class HiveGiraphRunner implements Tool {
LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
hiveToVertexClass.getCanonicalName());
}
- if (hiveToVertexEdgesClass != null) {
- LOG.info(LOG_PREFIX + "-hiveToVertexEdgesClass=" +
- hiveToVertexEdgesClass.getCanonicalName());
- }
if (classes.getVertexInputFormatClass() != null) {
LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
classes.getVertexInputFormatClass().getCanonicalName());
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
new file mode 100644
index 0000000..c8b201f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * Default implementation of {@link HiveTableSchemaAware} and
+ * {@link org.apache.giraph.conf.ImmutableClassesGiraphConfigurable}
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class DefaultConfigurableAndTableSchemaAware<
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
+ implements HiveTableSchemaAware {
+ /** Schema stored here */
+ private HiveTableSchema tableSchema;
+
+ @Override public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
new file mode 100644
index 0000000..7b64a40
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import com.google.common.collect.AbstractIterator;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link RecordReader} into {@link java.util.Iterator}
+ *
+ * @param <T> Data of record reader
+ */
+public class RecordReaderWrapper<T> extends AbstractIterator<T> {
+ /** Wrapped {@link RecordReader} */
+ private final RecordReader<WritableComparable, T> recordReader;
+
+ /**
+ * Constructor
+ *
+ * @param recordReader {@link RecordReader} to wrap
+ */
+ public RecordReaderWrapper(RecordReader<WritableComparable, T> recordReader) {
+ this.recordReader = recordReader;
+ }
+
+ @Override
+ protected T computeNext() {
+ try {
+ if (!recordReader.nextKeyValue()) {
+ endOfData();
+ return null;
+ }
+ return recordReader.getCurrentValue();
+ } catch (IOException e) {
+ throw new IllegalStateException("computeNext: IOException occurred");
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "computeNext: InterruptedException occurred");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
index f29fea7..c16e808 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java
@@ -17,12 +17,10 @@
*/
package org.apache.giraph.hive.input.edge;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
/**
* Base class for HiveToEdge implementations
*
@@ -33,5 +31,10 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
*/
public abstract class AbstractHiveToEdge<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, HiveToEdge<I, E> { }
+ extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+ implements HiveToEdge<I, E> {
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 18b40c2..68edbfc 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.giraph.hive.input.edge;
import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.iterables.EdgeReaderWrapper;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,6 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
}
reader.setHiveRecordReader(baseReader);
- return reader;
+ return new EdgeReaderWrapper<I, E>(reader);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index 6fb183a..275f8f7 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -19,9 +19,9 @@
package org.apache.giraph.hive.input.edge;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.ReusableEdge;
-import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.iterables.EdgeWithSource;
+import org.apache.giraph.io.iterables.GiraphReader;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.giraph.hive.HiveRecord;
import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
import com.facebook.giraph.hive.HiveTableSchemas;
import java.io.IOException;
@@ -42,11 +43,9 @@ import java.io.IOException;
* @param <E> Edge Value
*/
public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
- implements EdgeReader<I, E> {
+ implements GiraphReader<EdgeWithSource<I, E>>, HiveTableSchemaAware {
/** Configuration key for edge creator class */
public static final String HIVE_TO_EDGE_KEY = "giraph.hive.to.edge.class";
- /** Configuration key for whether to reuse edge */
- public static final String REUSE_EDGE_KEY = "giraph.hive.reuse.edge";
/** Underlying Hive RecordReader used */
private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
@@ -58,11 +57,6 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
/** User class to create edges from a HiveRecord */
private HiveToEdge<I, E> hiveToEdge;
- /**
- * If we are reusing edges this will be the single edge to read into.
- * Otherwise if it's null we will create a new edge each time.
- */
- private ReusableEdge<I, E> edgeToReuse = null;
/**
* Get underlying Hive record reader used.
@@ -83,20 +77,12 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
this.hiveRecordReader = hiveRecordReader;
}
- /**
- * Get Hive table schema for table being read from.
- *
- * @return Hive table schema for table
- */
+ @Override
public HiveTableSchema getTableSchema() {
return tableSchema;
}
- /**
- * Set Hive schema for table being read from.
- *
- * @param tableSchema Hive table schema
- */
+ @Override
public void setTableSchema(HiveTableSchema tableSchema) {
this.tableSchema = tableSchema;
}
@@ -116,13 +102,12 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
hiveRecordReader.initialize(inputSplit, context);
conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
instantiateHiveToEdgeFromConf();
- if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
- edgeToReuse = conf.createReusableEdge();
- }
+ hiveToEdge.initializeRecords(
+ new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
}
/**
- * Retrieve the user's HiveEdgeCreator from the Configuration.
+ * Retrieve the user's {@link HiveToEdge} from the Configuration.
*
* @throws IOException if anything goes wrong reading from Configuration
*/
@@ -137,11 +122,6 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
}
@Override
- public boolean nextEdge() throws IOException, InterruptedException {
- return hiveRecordReader.nextKeyValue();
- }
-
- @Override
public void close() throws IOException {
hiveRecordReader.close();
}
@@ -152,20 +132,17 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
}
@Override
- public I getCurrentSourceId() throws IOException, InterruptedException {
- return hiveToEdge.getSourceVertexId(hiveRecordReader.getCurrentValue());
+ public boolean hasNext() {
+ return hiveToEdge.hasNext();
}
@Override
- public Edge<I, E> getCurrentEdge() throws IOException,
- InterruptedException {
- HiveRecord record = hiveRecordReader.getCurrentValue();
- ReusableEdge<I, E> edge = edgeToReuse;
- if (edge == null) {
- edge = conf.createReusableEdge();
- }
- edge.setValue(hiveToEdge.getEdgeValue(record));
- edge.setTargetVertexId(hiveToEdge.getTargetVertexId(record));
- return edge;
+ public EdgeWithSource<I, E> next() {
+ return hiveToEdge.next();
+ }
+
+ @Override
+ public void remove() {
+ hiveToEdge.remove();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
index 2205b82..8b22a8f 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -18,39 +18,30 @@
package org.apache.giraph.hive.input.edge;
+import org.apache.giraph.io.iterables.EdgeWithSource;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
/**
* An interface used to create edges from Hive records.
*
+ * It gets initialized with HiveRecord iterator, and it needs to provide an
+ * iterator over edges, so it's possible to skip some rows from the input,
+ * combine several rows together, etc.
+ *
* @param <I> Vertex ID
* @param <E> Edge Value
*/
-public interface HiveToEdge<I extends WritableComparable, E extends Writable> {
- /**
- * Read source vertex ID from Hive record
- *
- * @param hiveRecord HiveRecord to read from
- * @return source vertex ID
- */
- I getSourceVertexId(HiveReadableRecord hiveRecord);
-
- /**
- * Read target vertex ID from Hive record
- *
- * @param hiveRecord HiveRecord to read from
- * @return target vertex ID
- */
- I getTargetVertexId(HiveReadableRecord hiveRecord);
-
+public interface HiveToEdge<I extends WritableComparable,
+ E extends Writable> extends Iterator<EdgeWithSource<I, E>> {
/**
- * Read edge value from the Hive record.
+ * Set the records which contain edge input data
*
- * @param hiveRecord HiveRecord to read from
- * @return Edge value
+ * @param records Hive records
*/
- E getEdgeValue(HiveReadableRecord hiveRecord);
+ void initializeRecords(Iterator<HiveRecord> records);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
new file mode 100644
index 0000000..0b76683
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.io.iterables.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link HiveToEdge} when each edge is in the one
+ * row of the input.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class SimpleHiveToEdge<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends AbstractHiveToEdge<I, V, E, M> {
+ /** Iterator over input records */
+ private Iterator<HiveRecord> records;
+ /** Reusable {@link EdgeWithSource} object */
+ private EdgeWithSource<I, E> reusableEdge = new EdgeWithSource<I, E>();
+
+ /**
+ * Read source vertex ID from Hive record
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return source vertex ID
+ */
+ public abstract I getSourceVertexId(HiveReadableRecord hiveRecord);
+
+ /**
+ * Read target vertex ID from Hive record
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return target vertex ID
+ */
+ public abstract I getTargetVertexId(HiveReadableRecord hiveRecord);
+
+ /**
+ * Read edge value from the Hive record.
+ *
+ * @param hiveRecord HiveRecord to read from
+ * @return Edge value
+ */
+ public abstract E getEdgeValue(HiveReadableRecord hiveRecord);
+
+ @Override
+ public final void initializeRecords(Iterator<HiveRecord> records) {
+ this.records = records;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ @Override
+ public EdgeWithSource<I, E> next() {
+ HiveRecord record = records.next();
+ reusableEdge.setSourceVertexId(getSourceVertexId(record));
+ reusableEdge.setTargetVertexId(getTargetVertexId(record));
+ reusableEdge.setEdgeValue(getEdgeValue(record));
+ return reusableEdge;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
new file mode 100644
index 0000000..a138846
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Base class for HiveToVertex implementations
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class AbstractHiveToVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+ implements HiveToVertex<I, V, E, M> {
+ @Override
+ public final void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
deleted file mode 100644
index d0668f6..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
-/**
- * Base class for HiveToVertexEdges implementations
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- * @param <M> Message Value
- */
-public abstract class AbstractHiveToVertexEdges<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, HiveToVertexEdges<I, E> {
- /** Schema stored here */
- private HiveTableSchema tableSchema;
-
- @Override public void setTableSchema(HiveTableSchema tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- @Override public HiveTableSchema getTableSchema() {
- return tableSchema;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
deleted file mode 100644
index 9ab316f..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
-/**
- * Base class for HiveToVertex implementations
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- * @param <M> Message Value
- */
-public abstract class AbstractHiveToVertexValue<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, HiveToVertexValue<I, V> {
- /** Schema stored here */
- private HiveTableSchema tableSchema;
-
- @Override public void setTableSchema(HiveTableSchema tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- @Override public HiveTableSchema getTableSchema() {
- return tableSchema;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
new file mode 100644
index 0000000..1179961
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * An interface used to create vertices from Hive records.
+ *
+ * It gets initialized with HiveRecord iterator, and it needs to provide an
+ * iterator over vertices, so it's possible to skip some rows from the input,
+ * combine several rows together, etc.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public interface HiveToVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> extends
+ Iterator<Vertex<I, V, E, M>> {
+ /**
+ * Set the records which contain vertex input data
+ *
+ * @param records Hive records
+ */
+ void initializeRecords(Iterator<HiveRecord> records);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
deleted file mode 100644
index 8076a8a..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveReadableRecord;
-
-import java.util.Collections;
-
-/**
- * Interface for creating edges for a vertex from a Hive record.
- * Used with HiveToVertex if you want to also read edges per vertex, as opposed
- * to using {@link org.apache.giraph.hive.input.edge.HiveEdgeInputFormat}
- *
- * @param <I> Vertex ID
- * @param <E> extends Writable
- */
-public interface HiveToVertexEdges<I extends WritableComparable,
- E extends Writable> {
- /**
- * Read Vertex's edges from the HiveRecord given.
- *
- * @param record HiveRecord to read from.
- * @return iterable of edges
- */
- Iterable<Edge<I, E>> getEdges(HiveReadableRecord record);
-
- /**
- * Default implementation that returns an empty list of edges.
- */
- public class Empty implements HiveToVertexEdges {
- /** Singleton */
- private static final Empty INSTANCE = new Empty();
-
- /** Don't construct, allow inheritance */
- protected Empty() { }
-
- /**
- * Get singleton instance
- * @return Empty
- */
- public static Empty get() { return INSTANCE; }
-
- @Override
- public Iterable getEdges(HiveReadableRecord record) {
- return Collections.emptyList();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
deleted file mode 100644
index 593eb9a..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.giraph.hive.HiveReadableRecord;
-
-/**
- * Interface for creating vertices from a Hive record.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- */
-public interface HiveToVertexValue<I extends WritableComparable,
- V extends Writable> {
- /**
- * Read the Vertex's ID from the HiveRecord given.
- *
- * @param record HiveRecord to read from.
- * @return Vertex ID
- */
- I getVertexId(HiveReadableRecord record);
-
- /**
- * Read the Vertex's Value from the HiveRecord given.
- *
- * @param record HiveRecord to read from.
- * @return Vertex Value
- */
- V getVertexValue(HiveReadableRecord record);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 25c7a26..4f70750 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.hive.input.vertex;
import org.apache.giraph.hive.common.HiveProfiles;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.iterables.VertexReaderWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -67,7 +68,7 @@ public class HiveVertexInputFormat<I extends WritableComparable,
TaskAttemptContext context) throws IOException {
Configuration conf = context.getConfiguration();
- HiveVertexReader reader = new HiveVertexReader();
+ HiveVertexReader<I, V, E, M> reader = new HiveVertexReader<I, V, E, M>();
reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
HiveApiRecordReader baseReader;
@@ -78,6 +79,6 @@ public class HiveVertexInputFormat<I extends WritableComparable,
}
reader.setHiveRecordReader(baseReader);
- return reader;
+ return new VertexReaderWrapper<I, V, E, M>(reader);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 541176f..442c796 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -19,9 +19,9 @@
package org.apache.giraph.hive.input.vertex;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
+import org.apache.giraph.io.iterables.GiraphReader;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -46,16 +46,10 @@ import java.io.IOException;
*/
public class HiveVertexReader<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements VertexReader<I, V, E, M>, HiveTableSchemaAware {
- /** Configuration key for {@link HiveToVertexValue} class */
+ implements GiraphReader<Vertex<I, V, E, M>>, HiveTableSchemaAware {
+ /** Configuration key for {@link HiveToVertex} class */
public static final String HIVE_TO_VERTEX_KEY =
- "giraph.hive.to.vertex.value.class";
- /** Configuration key for {@link HiveToVertexEdges} class */
- public static final String HIVE_TO_VERTEX_EDGES_KEY =
- "giraph.hive.to.vertex.edges.class";
- /** Configuration key for whether to reuse vertex */
- public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex";
-
+ "giraph.hive.to.vertex.class";
/** Underlying Hive RecordReader used */
private HiveApiRecordReader hiveRecordReader;
/** Schema for table in Hive */
@@ -64,16 +58,11 @@ public class HiveVertexReader<I extends WritableComparable,
/** Configuration */
private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** User class to create vertices from a HiveRecord */
- private HiveToVertexValue<I, V> hiveToVertexValue;
- /** User class to create vertex edges from HiveRecord - optional */
- private HiveToVertexEdges<I, E> hiveToVertexEdges;
-
/**
- * If we are reusing vertices this will be the single vertex to read into.
- * Otherwise if it's null we will create a new vertex each time.
+ * {@link HiveToVertex} chosen by user,
+ * or {@link SimpleHiveToVertex} if none specified
*/
- private Vertex<I, V, E, M> vertexToReuse = null;
+ private HiveToVertex<I, V, E, M> hiveToVertex;
/**
* Get underlying Hive record reader used.
@@ -113,74 +102,41 @@ public class HiveVertexReader<I extends WritableComparable,
}
@Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
hiveRecordReader.initialize(inputSplit, context);
- conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
- instantiateHiveToVertexValueFromConf();
- instantiateHiveToVertexEdgesFromConf();
- if (conf.getBoolean(REUSE_VERTEX_KEY, false)) {
- vertexToReuse = conf.createVertex();
- }
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E,
+ M>(context.getConfiguration());
+ Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
+ SimpleHiveToVertex.class, HiveToVertex.class);
+ hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+ HiveTableSchemas.configure(hiveToVertex, tableSchema);
+ hiveToVertex.initializeRecords(
+ new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
}
- /**
- * Retrieve the user's HiveToVertex from our configuration.
- *
- * @throws IOException if anything goes wrong reading from Configuration.
- */
- private void instantiateHiveToVertexValueFromConf() throws IOException {
- Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
- null, HiveToVertexValue.class);
- if (klass == null) {
- throw new IOException(HIVE_TO_VERTEX_KEY + " not set in conf");
- }
- hiveToVertexValue = ReflectionUtils.newInstance(klass, conf);
- HiveTableSchemas.configure(hiveToVertexValue, tableSchema);
- }
-
- /**
- * Retrieve the user's HiveToVertexEdges from our configuration. This class
- * is optional. If not specified will just use HiveToVertexEdges.Empty.
- */
- private void instantiateHiveToVertexEdgesFromConf() {
- Class<? extends HiveToVertexEdges> klass = conf.getClass(
- HIVE_TO_VERTEX_EDGES_KEY, null, HiveToVertexEdges.class);
- if (klass == null) {
- hiveToVertexEdges = HiveToVertexEdges.Empty.get();
- } else {
- hiveToVertexEdges = ReflectionUtils.newInstance(klass, conf);
- }
- HiveTableSchemas.configure(hiveToVertexEdges, tableSchema);
+ @Override
+ public void close() throws IOException {
+ hiveRecordReader.close();
}
@Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return hiveRecordReader.nextKeyValue();
+ public float getProgress() throws IOException, InterruptedException {
+ return hiveRecordReader.getProgress();
}
@Override
- public void close() throws IOException {
- hiveRecordReader.close();
+ public boolean hasNext() {
+ return hiveToVertex.hasNext();
}
@Override
- public float getProgress() throws IOException, InterruptedException {
- return hiveRecordReader.getProgress();
+ public Vertex<I, V, E, M> next() {
+ return hiveToVertex.next();
}
@Override
- public final Vertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException {
- HiveRecord hiveRecord = hiveRecordReader.getCurrentValue();
- Vertex<I, V, E, M> vertex = vertexToReuse;
- if (vertex == null) {
- vertex = conf.createVertex();
- }
- I id = hiveToVertexValue.getVertexId(hiveRecord);
- V value = hiveToVertexValue.getVertexValue(hiveRecord);
- Iterable<Edge<I, E>> edges = hiveToVertexEdges.getEdges(hiveRecord);
- vertex.initialize(id, value, edges);
- return vertex;
+ public void remove() {
+ hiveToVertex.remove();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
new file mode 100644
index 0000000..f42536d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link HiveToVertex} when each vertex is in the one
+ * row of the input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class SimpleHiveToVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends AbstractHiveToVertex<I, V, E, M> {
+ /** Hive records which we are reading from */
+ private Iterator<HiveRecord> records;
+
+ /** Reusable vertex object */
+ private Vertex<I, V, E, M> reusableVertex;
+
+ /**
+ * Read the Vertex's ID from the HiveRecord given.
+ *
+ * @param record HiveRecord to read from.
+ * @return Vertex ID
+ */
+ public abstract I getVertexId(HiveReadableRecord record);
+
+ /**
+ * Read the Vertex's Value from the HiveRecord given.
+ *
+ * @param record HiveRecord to read from.
+ * @return Vertex Value
+ */
+ public abstract V getVertexValue(HiveReadableRecord record);
+
+ /**
+ * Read Vertex's edges from the HiveRecord given.
+ *
+ * @param record HiveRecord to read from.
+ * @return iterable of edges
+ */
+ public abstract Iterable<Edge<I, E>> getEdges(HiveReadableRecord record);
+
+ @Override
+ public void initializeRecords(Iterator<HiveRecord> records) {
+ this.records = records;
+ reusableVertex = getConf().createVertex();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ @Override
+ public Vertex<I, V, E, M> next() {
+ HiveRecord record = records.next();
+ I id = getVertexId(record);
+ V value = getVertexValue(record);
+ Iterable<Edge<I, E>> edges = getEdges(record);
+ reusableVertex.initialize(id, value, edges);
+ return reusableVertex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
new file mode 100644
index 0000000..195a69e
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Simple implementation of {@link HiveToVertex} when each vertex is in the one
+ * row of the input, and there are no edges in vertex input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class SimpleNoEdgesHiveToVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> extends
+ SimpleHiveToVertex<I, V, E, M> {
+ @Override
+ public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) {
+ return ImmutableList.of();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/95e12267/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
index fe0771d..a688bf5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
@@ -17,13 +17,10 @@
*/
package org.apache.giraph.hive.output;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-
/**
* Base class for VertexToHive implementations
*
@@ -34,16 +31,5 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
*/
public abstract class AbstractVertexToHive<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, VertexToHive<I, V, E> {
- /** Schema stored here */
- private HiveTableSchema tableSchema;
-
- @Override public void setTableSchema(HiveTableSchema tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- @Override public HiveTableSchema getTableSchema() {
- return tableSchema;
- }
-}
+ extends DefaultConfigurableAndTableSchemaAware<I, V, E, M>
+ implements VertexToHive<I, V, E> { }