You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/21 18:15:48 UTC
[2/6] git commit: GIRAPH-575: Update hive-io dependency (nitay)
GIRAPH-575: Update hive-io dependency (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/799711e1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/799711e1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/799711e1
Branch: refs/heads/partition-values-575
Commit: 799711e1d8ae6ae1bac523d0368f8b80106dee8f
Parents: 42b5ec9
Author: Nitay Joffe <ni...@apache.org>
Authored: Tue Mar 19 15:54:52 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Mar 19 15:54:52 2013 -0400
----------------------------------------------------------------------
.../hive/input/edge/HiveEdgeInputFormat.java | 6 +-
.../input/vertex/AbstractHiveToVertexEdges.java | 19 ++++++++-
.../input/vertex/AbstractHiveToVertexValue.java | 19 ++++++++-
.../giraph/hive/input/vertex/HiveToRecord.java | 33 +++++++++++++++
.../hive/input/vertex/HiveToVertexEdges.java | 13 ++++--
.../hive/input/vertex/HiveToVertexValue.java | 2 +-
.../hive/input/vertex/HiveVertexInputFormat.java | 9 ++--
.../giraph/hive/input/vertex/HiveVertexReader.java | 15 ++++---
pom.xml | 2 +-
9 files changed, 95 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 3f40763..18b40c2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -66,17 +66,17 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
throws IOException {
Configuration conf = context.getConfiguration();
- RecordReader<WritableComparable, HiveRecord> baseReader;
HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+ RecordReader<WritableComparable, HiveRecord> baseReader;
try {
baseReader = hiveInputFormat.createRecordReader(split, context);
- reader.setHiveRecordReader(baseReader);
- reader.initialize(split, context);
} catch (InterruptedException e) {
throw new IllegalStateException("Could not create edge record reader", e);
}
+
+ reader.setHiveRecordReader(baseReader);
return reader;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
index 7b01dac..cb67749 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
@@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.facebook.giraph.hive.HiveTableSchema;
import com.facebook.giraph.hive.HiveTableSchemaAware;
+import java.util.Map;
+
/**
* Base class for HiveToVertexEdges implementations
*
@@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
public abstract class AbstractHiveToVertexEdges<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { }
+ implements HiveTableSchemaAware, HiveToVertexEdges<I, E> {
+ /** Schema stored here */
+ private HiveTableSchema tableSchema;
+
+ @Override public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public void readingPartition(Map<String, String> partitionValues) { }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
index 5c279b5..7707cd9 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
@@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.facebook.giraph.hive.HiveTableSchema;
import com.facebook.giraph.hive.HiveTableSchemaAware;
+import java.util.Map;
+
/**
* Base class for HiveToVertex implementations
*
@@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
public abstract class AbstractHiveToVertexValue<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements HiveTableSchemaAware, HiveToVertexValue<I, V> { }
+ implements HiveTableSchemaAware, HiveToVertexValue<I, V> {
+ /** Schema stored here */
+ private HiveTableSchema tableSchema;
+
+ @Override public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public void readingPartition(Map<String, String> partitionValues) { }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
new file mode 100644
index 0000000..afcf4ad
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import java.util.Map;
+
+/**
+ * Base interface for HiveTo{X} classes. Holds API common to both.
+ */
+public interface HiveToRecord {
+ /**
+ * Notification that we start reading a split.
+ *
+ * @param partitionValues Map of partition data.
+ */
+ void readingPartition(Map<String, String> partitionValues);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
index cf7ea33..0d303d9 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
import com.facebook.giraph.hive.HiveReadableRecord;
import java.util.Collections;
+import java.util.Map;
/**
* Interface for creating edges for a vertex from a Hive record.
@@ -35,7 +36,7 @@ import java.util.Collections;
* @param <E> extends Writable
*/
public interface HiveToVertexEdges<I extends WritableComparable,
- E extends Writable> {
+ E extends Writable> extends HiveToRecord {
/**
* Read Vertex's edges from the HiveRecord given.
*
@@ -51,8 +52,8 @@ public interface HiveToVertexEdges<I extends WritableComparable,
/** Singleton */
private static final Empty INSTANCE = new Empty();
- /** Don't construct */
- private Empty() { }
+ /** Don't construct, allow inheritance */
+ protected Empty() { }
/**
* Get singleton instance
@@ -60,7 +61,11 @@ public interface HiveToVertexEdges<I extends WritableComparable,
*/
public static Empty get() { return INSTANCE; }
- @Override public Iterable getEdges(HiveReadableRecord record) {
+ @Override
+ public void readingPartition(Map<String, String> partitionValues) { }
+
+ @Override
+ public Iterable getEdges(HiveReadableRecord record) {
return Collections.emptyList();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
index 593eb9a..382e295 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
@@ -30,7 +30,7 @@ import com.facebook.giraph.hive.HiveReadableRecord;
* @param <V> Vertex Value
*/
public interface HiveToVertexValue<I extends WritableComparable,
- V extends Writable> {
+ V extends Writable> extends HiveToRecord {
/**
* Read the Vertex's ID from the HiveRecord given.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index fb3b123..25c7a26 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -26,10 +26,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
import com.facebook.giraph.hive.input.HiveApiInputFormat;
import java.io.IOException;
@@ -68,17 +67,17 @@ public class HiveVertexInputFormat<I extends WritableComparable,
TaskAttemptContext context) throws IOException {
Configuration conf = context.getConfiguration();
- RecordReader<WritableComparable, HiveRecord> baseReader;
HiveVertexReader reader = new HiveVertexReader();
reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+ HiveApiRecordReader baseReader;
try {
baseReader = hiveInputFormat.createRecordReader(split, context);
- reader.setHiveRecordReader(baseReader);
- reader.initialize(split, context);
} catch (InterruptedException e) {
throw new IOException("Could not create vertex reader", e);
}
+
+ reader.setHiveRecordReader(baseReader);
return reader;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 2311e72..da6e426 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -26,13 +26,13 @@ import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.giraph.hive.HiveRecord;
import com.facebook.giraph.hive.HiveTableSchema;
import com.facebook.giraph.hive.HiveTableSchemaAware;
import com.facebook.giraph.hive.HiveTableSchemas;
+import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
import java.io.IOException;
@@ -57,7 +57,7 @@ public class HiveVertexReader<I extends WritableComparable,
public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex";
/** Underlying Hive RecordReader used */
- private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+ private HiveApiRecordReader hiveRecordReader;
/** Schema for table in Hive */
private HiveTableSchema tableSchema;
@@ -80,7 +80,7 @@ public class HiveVertexReader<I extends WritableComparable,
*
* @return RecordReader from Hive.
*/
- public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+ public HiveApiRecordReader getHiveRecordReader() {
return hiveRecordReader;
}
@@ -89,8 +89,7 @@ public class HiveVertexReader<I extends WritableComparable,
*
* @param hiveRecordReader RecordReader to read from Hive.
*/
- public void setHiveRecordReader(
- RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+ public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) {
this.hiveRecordReader = hiveRecordReader;
}
@@ -118,11 +117,13 @@ public class HiveVertexReader<I extends WritableComparable,
throws IOException, InterruptedException {
hiveRecordReader.initialize(inputSplit, context);
conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
- instantiateHiveToVertexFromConf();
+ instantiateHiveToVertexValueFromConf();
instantiateHiveToVertexEdgesFromConf();
if (conf.getBoolean(REUSE_VERTEX_KEY, false)) {
vertexToReuse = conf.createVertex();
}
+ hiveToVertexEdges.readingPartition(hiveRecordReader.getPartitionValues());
+ hiveToVertexValue.readingPartition(hiveRecordReader.getPartitionValues());
}
/**
@@ -130,7 +131,7 @@ public class HiveVertexReader<I extends WritableComparable,
*
* @throws IOException if anything goes wrong reading from Configuration.
*/
- private void instantiateHiveToVertexFromConf() throws IOException {
+ private void instantiateHiveToVertexValueFromConf() throws IOException {
Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
null, HiveToVertexValue.class);
if (klass == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 53c57ca..e576e4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,7 +881,7 @@ under the License.
<dependency>
<groupId>com.facebook.giraph.hive</groupId>
<artifactId>hive-io-experimental</artifactId>
- <version>0.3</version>
+ <version>0.4-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>