You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/21 18:15:48 UTC

[2/6] git commit: GIRAPH-575: Update hive-io dependency (nitay)

GIRAPH-575: Update hive-io dependency (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/799711e1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/799711e1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/799711e1

Branch: refs/heads/partition-values-575
Commit: 799711e1d8ae6ae1bac523d0368f8b80106dee8f
Parents: 42b5ec9
Author: Nitay Joffe <ni...@apache.org>
Authored: Tue Mar 19 15:54:52 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Mar 19 15:54:52 2013 -0400

----------------------------------------------------------------------
 .../hive/input/edge/HiveEdgeInputFormat.java       |    6 +-
 .../input/vertex/AbstractHiveToVertexEdges.java    |   19 ++++++++-
 .../input/vertex/AbstractHiveToVertexValue.java    |   19 ++++++++-
 .../giraph/hive/input/vertex/HiveToRecord.java     |   33 +++++++++++++++
 .../hive/input/vertex/HiveToVertexEdges.java       |   13 ++++--
 .../hive/input/vertex/HiveToVertexValue.java       |    2 +-
 .../hive/input/vertex/HiveVertexInputFormat.java   |    9 ++--
 .../giraph/hive/input/vertex/HiveVertexReader.java |   15 ++++---
 pom.xml                                            |    2 +-
 9 files changed, 95 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 3f40763..18b40c2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -66,17 +66,17 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     throws IOException {
     Configuration conf = context.getConfiguration();
 
-    RecordReader<WritableComparable, HiveRecord> baseReader;
     HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
     reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
 
+    RecordReader<WritableComparable, HiveRecord> baseReader;
     try {
       baseReader = hiveInputFormat.createRecordReader(split, context);
-      reader.setHiveRecordReader(baseReader);
-      reader.initialize(split, context);
     } catch (InterruptedException e) {
       throw new IllegalStateException("Could not create edge record reader", e);
     }
+
+    reader.setHiveRecordReader(baseReader);
     return reader;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
index 7b01dac..cb67749 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java
@@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.facebook.giraph.hive.HiveTableSchema;
 import com.facebook.giraph.hive.HiveTableSchemaAware;
 
+import java.util.Map;
+
 /**
  * Base class for HiveToVertexEdges implementations
  *
@@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
 public abstract class AbstractHiveToVertexEdges<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { }
+    implements HiveTableSchemaAware, HiveToVertexEdges<I, E> {
+  /** Schema stored here */
+  private HiveTableSchema tableSchema;
+
+  @Override public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  @Override
+  public void readingPartition(Map<String, String> partitionValues) { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
index 5c279b5..7707cd9 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java
@@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.facebook.giraph.hive.HiveTableSchema;
 import com.facebook.giraph.hive.HiveTableSchemaAware;
 
+import java.util.Map;
+
 /**
  * Base class for HiveToVertex implementations
  *
@@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
 public abstract class AbstractHiveToVertexValue<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, HiveToVertexValue<I, V> { }
+    implements HiveTableSchemaAware, HiveToVertexValue<I, V> {
+  /** Schema stored here */
+  private HiveTableSchema tableSchema;
+
+  @Override public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  @Override
+  public void readingPartition(Map<String, String> partitionValues) { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
new file mode 100644
index 0000000..afcf4ad
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import java.util.Map;
+
+/**
+ * Base interface for HiveTo{X} classes. Holds API common to both.
+ */
+public interface HiveToRecord {
+  /**
+   * Notification that we start reading a split.
+   *
+   * @param partitionValues Map of partition data.
+   */
+  void readingPartition(Map<String, String> partitionValues);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
index cf7ea33..0d303d9 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
 import com.facebook.giraph.hive.HiveReadableRecord;
 
 import java.util.Collections;
+import java.util.Map;
 
 /**
  * Interface for creating edges for a vertex from a Hive record.
@@ -35,7 +36,7 @@ import java.util.Collections;
  * @param <E> extends Writable
  */
 public interface HiveToVertexEdges<I extends WritableComparable,
-    E extends Writable> {
+    E extends Writable> extends HiveToRecord {
   /**
    * Read Vertex's edges from the HiveRecord given.
    *
@@ -51,8 +52,8 @@ public interface HiveToVertexEdges<I extends WritableComparable,
     /** Singleton */
     private static final Empty INSTANCE = new Empty();
 
-    /** Don't construct */
-    private Empty() { }
+    /** Don't construct, allow inheritance */
+    protected Empty() { }
 
     /**
      * Get singleton instance
@@ -60,7 +61,11 @@ public interface HiveToVertexEdges<I extends WritableComparable,
      */
     public static Empty get() { return INSTANCE; }
 
-    @Override public Iterable getEdges(HiveReadableRecord record) {
+    @Override
+    public void readingPartition(Map<String, String> partitionValues) { }
+
+    @Override
+    public Iterable getEdges(HiveReadableRecord record) {
       return Collections.emptyList();
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
index 593eb9a..382e295 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java
@@ -30,7 +30,7 @@ import com.facebook.giraph.hive.HiveReadableRecord;
  * @param <V> Vertex Value
  */
 public interface HiveToVertexValue<I extends WritableComparable,
-    V extends Writable> {
+    V extends Writable> extends HiveToRecord {
   /**
    * Read the Vertex's ID from the HiveRecord given.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index fb3b123..25c7a26 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -26,10 +26,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
 import com.facebook.giraph.hive.input.HiveApiInputFormat;
 
 import java.io.IOException;
@@ -68,17 +67,17 @@ public class HiveVertexInputFormat<I extends WritableComparable,
       TaskAttemptContext context) throws IOException {
     Configuration conf = context.getConfiguration();
 
-    RecordReader<WritableComparable, HiveRecord> baseReader;
     HiveVertexReader reader = new HiveVertexReader();
     reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
 
+    HiveApiRecordReader baseReader;
     try {
       baseReader = hiveInputFormat.createRecordReader(split, context);
-      reader.setHiveRecordReader(baseReader);
-      reader.initialize(split, context);
     } catch (InterruptedException e) {
       throw new IOException("Could not create vertex reader", e);
     }
+
+    reader.setHiveRecordReader(baseReader);
     return reader;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 2311e72..da6e426 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -26,13 +26,13 @@ import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.facebook.giraph.hive.HiveRecord;
 import com.facebook.giraph.hive.HiveTableSchema;
 import com.facebook.giraph.hive.HiveTableSchemaAware;
 import com.facebook.giraph.hive.HiveTableSchemas;
+import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
 
 import java.io.IOException;
 
@@ -57,7 +57,7 @@ public class HiveVertexReader<I extends WritableComparable,
   public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex";
 
   /** Underlying Hive RecordReader used */
-  private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+  private HiveApiRecordReader hiveRecordReader;
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
 
@@ -80,7 +80,7 @@ public class HiveVertexReader<I extends WritableComparable,
    *
    * @return RecordReader from Hive.
    */
-  public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+  public HiveApiRecordReader getHiveRecordReader() {
     return hiveRecordReader;
   }
 
@@ -89,8 +89,7 @@ public class HiveVertexReader<I extends WritableComparable,
    *
    * @param hiveRecordReader RecordReader to read from Hive.
    */
-  public void setHiveRecordReader(
-      RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+  public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) {
     this.hiveRecordReader = hiveRecordReader;
   }
 
@@ -118,11 +117,13 @@ public class HiveVertexReader<I extends WritableComparable,
     throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
     conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
-    instantiateHiveToVertexFromConf();
+    instantiateHiveToVertexValueFromConf();
     instantiateHiveToVertexEdgesFromConf();
     if (conf.getBoolean(REUSE_VERTEX_KEY, false)) {
       vertexToReuse = conf.createVertex();
     }
+    hiveToVertexEdges.readingPartition(hiveRecordReader.getPartitionValues());
+    hiveToVertexValue.readingPartition(hiveRecordReader.getPartitionValues());
   }
 
   /**
@@ -130,7 +131,7 @@ public class HiveVertexReader<I extends WritableComparable,
    *
    * @throws IOException if anything goes wrong reading from Configuration.
    */
-  private void instantiateHiveToVertexFromConf() throws IOException {
+  private void instantiateHiveToVertexValueFromConf() throws IOException {
     Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
         null, HiveToVertexValue.class);
     if (klass == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 53c57ca..e576e4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,7 +881,7 @@ under the License.
       <dependency>
         <groupId>com.facebook.giraph.hive</groupId>
         <artifactId>hive-io-experimental</artifactId>
-        <version>0.3</version>
+        <version>0.4-SNAPSHOT</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>