You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/27 17:44:29 UTC

git commit: GIRAPH-581: More flexible Hive output (majakabiljo)

Updated Branches:
  refs/heads/trunk 9cbbf99a3 -> 460198af9


GIRAPH-581: More flexible Hive output (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/460198af
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/460198af
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/460198af

Branch: refs/heads/trunk
Commit: 460198af9da68e0e723ae9237031cfae2cf8bfc0
Parents: 9cbbf99
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Mar 27 09:43:26 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Mar 27 09:43:26 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/hive/output/AbstractVertexToHive.java   |   16 ++++-
 .../apache/giraph/hive/output/HiveRecordSaver.java |   37 +++++++++
 .../giraph/hive/output/HiveVertexWriter.java       |   17 +++-
 .../giraph/hive/output/SimpleVertexToHive.java     |   60 +++++++++++++++
 .../apache/giraph/hive/output/VertexToHive.java    |   24 +++++--
 pom.xml                                            |    2 +-
 7 files changed, 144 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 674d15e..530d9ad 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-581: More flexible Hive output (majakabiljo)
+
   GIRAPH-579: Make it possible to use different out-edges data structures
   for input and computation (apresta)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
index 8e3f1ca..fe0771d 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
@@ -21,10 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.facebook.giraph.hive.HiveTableSchema;
 import com.facebook.giraph.hive.HiveTableSchemaAware;
 
 /**
- * Base class for HiveToVertexEdges implementations
+ * Base class for VertexToHive implementations
  *
  * @param <I> Vertex ID
  * @param <V> Vertex Value
@@ -34,4 +35,15 @@ import com.facebook.giraph.hive.HiveTableSchemaAware;
 public abstract class AbstractVertexToHive<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements HiveTableSchemaAware, VertexToHive<I, V, E> { }
+    implements HiveTableSchemaAware, VertexToHive<I, V, E> {
+  /** Schema stored here */
+  private HiveTableSchema tableSchema;
+
+  @Override public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
new file mode 100644
index 0000000..70de517
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.io.IOException;
+
+/**
+ * Interface which can save {@link HiveRecord}s.
+ */
+public interface HiveRecordSaver {
+  /**
+   * Save the record.
+   *
+   * @param record Record to save.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void save(HiveRecord record) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index a97d40a..3eef1f4 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -45,7 +45,7 @@ import java.util.Collections;
  * @param <E> Edge Value
  */
 public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
-    E extends Writable> implements VertexWriter<I, V, E> {
+    E extends Writable> implements VertexWriter<I, V, E>, HiveRecordSaver {
   /** Key in configuration for VertexToHive class */
   public static final String VERTEX_TO_HIVE_KEY = "giraph.vertex.to.hive.class";
 
@@ -57,6 +57,9 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
 
+  /** Reusable {@link HiveRecord} */
+  private HiveRecord reusableRecord;
+
   /** Configuration */
   private ImmutableClassesGiraphConfiguration<I, V, E, ?> conf;
 
@@ -98,6 +101,8 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
    */
   public void setTableSchema(HiveTableSchema tableSchema) {
     this.tableSchema = tableSchema;
+    reusableRecord = new HiveApiRecord(tableSchema.numColumns(),
+        Collections.<String>emptyList());
   }
 
   @Override
@@ -125,10 +130,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
   @Override
   public void writeVertex(Vertex<I, V, E, ?> vertex)
     throws IOException, InterruptedException {
-    HiveRecord record = new HiveApiRecord(tableSchema.numColumns(),
-        Collections.<String>emptyList());
-    vertexToHive.fillRecord(vertex, record);
-    hiveRecordWriter.write(NullWritable.get(), record);
+    vertexToHive.saveVertex(vertex, reusableRecord, this);
   }
 
   @Override
@@ -136,4 +138,9 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
     throws IOException, InterruptedException {
     hiveRecordWriter.close(context);
   }
+
+  @Override
+  public void save(HiveRecord record) throws IOException, InterruptedException {
+    hiveRecordWriter.write(NullWritable.get(), record);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
new file mode 100644
index 0000000..3e76d87
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveWritableRecord;
+
+import java.io.IOException;
+
+/**
+ * Simple implementation of {@link VertexToHive} when each {@link Vertex} is
+ * stored to one row in the output.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public abstract class SimpleVertexToHive<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    AbstractVertexToHive<I, V, E, M> {
+
+  /**
+   * Fill the HiveRecord from the Vertex given.
+   *
+   * @param vertex Vertex to read from.
+   * @param record HiveRecord to write to.
+   */
+  public abstract void fillRecord(Vertex<I, V, E, ?> vertex,
+      HiveWritableRecord record);
+
+  @Override
+  public final void saveVertex(
+      Vertex<I, V, E, ?> vertex,
+      HiveRecord reusableRecord,
+      HiveRecordSaver recordSaver) throws IOException, InterruptedException {
+    fillRecord(vertex, reusableRecord);
+    recordSaver.save(reusableRecord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index 6d323bd..ff5869d 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -22,10 +22,12 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveWritableRecord;
+import com.facebook.giraph.hive.HiveRecord;
+
+import java.io.IOException;
 
 /**
- * Interface for writing vertices to a Hive record.
+ * Interface for writing vertices to Hive.
  *
  * @param <I> Vertex ID
  * @param <V> Vertex Value
@@ -34,10 +36,20 @@ import com.facebook.giraph.hive.HiveWritableRecord;
 public interface VertexToHive<I extends WritableComparable, V extends Writable,
     E extends Writable> {
   /**
-   * Fill the HiveRecord from the Vertex given.
+   * Save vertex to the output. One vertex can be stored to multiple rows in
+   * the output.
+   *
+   * Record you get here is reusable, and the protocol to follow is:
+   * - fill the reusableRecord with your data
+   * - call recordSaver.save(reusableRecord)
+   * - repeat
+   * If you don't call save() at all then there won't be any output for this
+   * vertex.
    *
-   * @param vertex Vertex to read from.
-   * @param record HiveRecord to write to.
+   * @param vertex Vertex which we want to save.
+   * @param reusableRecord Record to use for writing data to it.
+   * @param recordSaver Saver of records
    */
-  void fillRecord(Vertex<I, V, E, ?> vertex, HiveWritableRecord record);
+  void saveVertex(Vertex<I, V, E, ?> vertex, HiveRecord reusableRecord,
+      HiveRecordSaver recordSaver) throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/460198af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e576e4b..799bb0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,7 +881,7 @@ under the License.
       <dependency>
         <groupId>com.facebook.giraph.hive</groupId>
         <artifactId>hive-io-experimental</artifactId>
-        <version>0.4-SNAPSHOT</version>
+        <version>0.4</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>