You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/22 05:03:28 UTC

[1/2] GIRAPH-453: Pure Hive I/O (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
new file mode 100644
index 0000000..092ea39
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive Edge input related things.
+ */
+package org.apache.giraph.hive.input.edge;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
new file mode 100644
index 0000000..7f4e5d6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive input things.
+ */
+package org.apache.giraph.hive.input;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
new file mode 100644
index 0000000..31f0e64
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * Interface for creating vertices from a Hive record.
+ * Also used for reading vertex values.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public interface HiveToVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends HiveTableSchemaAware {
+  /**
+   * Fill the Vertex from the HiveRecord given.
+   *
+   * @param record HiveRecord to read from.
+   * @param vertex Vertex to fill.
+   */
+  void fillVertex(HiveReadableRecord record, Vertex<I, V, E, ?> vertex);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
new file mode 100644
index 0000000..1d43055
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link VertexInputFormat} for reading vertices from Hive.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class HiveVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /** Underlying Hive InputFormat used */
+  private final HiveApiInputFormat hiveInputFormat;
+
+  /**
+   * Create vertex input format
+   */
+  public HiveVertexInputFormat() {
+    hiveInputFormat = new HiveApiInputFormat();
+    hiveInputFormat.setMyProfileId(HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hiveInputFormat.getSplits(context);
+  }
+
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    RecordReader<WritableComparable, HiveRecord> baseReader;
+    HiveVertexReader reader = new HiveVertexReader();
+    reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+
+    try {
+      baseReader = hiveInputFormat.createRecordReader(split, context);
+      reader.setHiveRecordReader(baseReader);
+      reader.initialize(split, context);
+    } catch (InterruptedException e) {
+      throw new IOException("Could not create vertex reader", e);
+    }
+    return reader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
new file mode 100644
index 0000000..c5974de
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+import java.io.IOException;
+
+/**
+ * VertexReader using Hive
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class HiveVertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements VertexReader<I, V, E, M>, HiveTableSchemaAware {
+  /** Key in Configuration for HiveToVertex class */
+  public static final String HIVE_TO_VERTEX_KEY = "giraph.hive.to.vertex.class";
+
+  /** Underlying Hive RecordReader used */
+  private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+  /** Schema for table in Hive */
+  private HiveTableSchema tableSchema;
+
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+  /** User class to create vertices from a HiveRecord */
+  private HiveToVertex<I, V, E> hiveToVertex;
+
+  /**
+   * Get underlying Hive record reader used.
+   *
+   * @return RecordReader from Hive.
+   */
+  public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+    return hiveRecordReader;
+  }
+
+  /**
+   * Set underlying Hive record reader used.
+   *
+   * @param hiveRecordReader RecordReader to read from Hive.
+   */
+  public void setHiveRecordReader(
+      RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+    this.hiveRecordReader = hiveRecordReader;
+  }
+
+  @Override
+  public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  @Override
+  public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  /**
+   * Get our Configuration.
+   *
+   * @return ImmutableClassesGiraphConfiguration
+   */
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    hiveRecordReader.initialize(inputSplit, context);
+    conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
+    instantiateHiveToVertexFromConf();
+  }
+
+  /**
+   * Retrieve the user's HiveVertexCreator from our configuration.
+   *
+   * @throws IOException if anything goes wrong reading from Configuration.
+   */
+  private void instantiateHiveToVertexFromConf() throws IOException {
+    Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
+        null, HiveToVertex.class);
+    if (klass == null) {
+      throw new IOException(HIVE_TO_VERTEX_KEY + " not set in conf");
+    }
+    hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+    hiveToVertex.setTableSchema(tableSchema);
+  }
+
+  @Override
+  public boolean nextVertex() throws IOException, InterruptedException {
+    return hiveRecordReader.nextKeyValue();
+  }
+
+  @Override
+  public void close() throws IOException {
+    hiveRecordReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return hiveRecordReader.getProgress();
+  }
+
+  @Override
+  public final Vertex<I, V, E, M> getCurrentVertex()
+    throws IOException, InterruptedException {
+    HiveRecord hiveRecord = hiveRecordReader.getCurrentValue();
+    Vertex vertex = conf.createVertex();
+    hiveToVertex.fillVertex(hiveRecord, vertex);
+    return vertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
new file mode 100644
index 0000000..9027962
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive vertex input related things.
+ */
+package org.apache.giraph.hive.input.vertex;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
new file mode 100644
index 0000000..641a298
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.output.HiveApiOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * VertexOutputFormat using Hive
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class HiveVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexOutputFormat<I, V, E> {
+  /** Underlying Hive OutputFormat used */
+  private final HiveApiOutputFormat hiveOutputFormat;
+
+  /**
+   * Create vertex output format
+   */
+  public HiveVertexOutputFormat() {
+    hiveOutputFormat = new HiveApiOutputFormat();
+    hiveOutputFormat.setMyProfileId(HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+  }
+
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    RecordWriter<WritableComparable, HiveRecord> baseWriter =
+        hiveOutputFormat.getRecordWriter(context);
+    HiveVertexWriter writer = new HiveVertexWriter();
+    writer.setBaseWriter(baseWriter);
+    writer.setTableSchema(hiveOutputFormat.getTableSchema(conf));
+    writer.initialize(context);
+    return writer;
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    hiveOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return hiveOutputFormat.getOutputCommitter(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
new file mode 100644
index 0000000..ea24fc5
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.impl.HiveApiRecord;
+
+import java.io.IOException;
+
+/**
+ * Vertex writer using Hive.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
+    E extends Writable> implements VertexWriter<I, V, E> {
+  /** Key in configuration for VertexToHive class */
+  public static final String VERTEX_TO_HIVE_KEY = "giraph.vertex.to.hive.class";
+
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class);
+
+  /** Underlying Hive RecordWriter used */
+  private RecordWriter<WritableComparable, HiveRecord>  hiveRecordWriter;
+  /** Schema for table in Hive */
+  private HiveTableSchema tableSchema;
+
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, ?> conf;
+
+  /** User class to write vertices from a HiveRecord */
+  private VertexToHive<I, V, E> vertexToHive;
+
+  /**
+   * Get underlying Hive record writer used.
+   *
+   * @return RecordWriter for Hive.
+   */
+  public RecordWriter<WritableComparable, HiveRecord> getBaseWriter() {
+    return hiveRecordWriter;
+  }
+
+  /**
+   * Set underlying Hive record writer used.
+   *
+   * @param hiveRecordWriter RecordWriter to write to Hive.
+   */
+  public void setBaseWriter(
+      RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter) {
+    this.hiveRecordWriter = hiveRecordWriter;
+  }
+
+  /**
+   * Get Hive table schema for table being read from.
+   *
+   * @return Hive table schema for table
+   */
+  public HiveTableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  /**
+   * Set Hive schema for table being read from.
+   *
+   * @param tableSchema Hive table schema
+   */
+  public void setTableSchema(HiveTableSchema tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override
+  public void initialize(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(
+        context.getConfiguration());
+    instantiateVertexToHiveFromConf();
+  }
+
+  /**
+   * Initialize VertexToHive instance from our configuration.
+   * @throws IOException errors instantiating
+   */
+  private void instantiateVertexToHiveFromConf() throws IOException {
+    Class<? extends VertexToHive> klass = conf.getClass(VERTEX_TO_HIVE_KEY,
+        null, VertexToHive.class);
+    if (klass == null) {
+      throw new IOException(VERTEX_TO_HIVE_KEY + " not set in conf");
+    }
+    vertexToHive = ReflectionUtils.newInstance(klass, conf);
+    vertexToHive.setTableSchema(tableSchema);
+  }
+
+  @Override
+  public void writeVertex(Vertex<I, V, E, ?> vertex)
+    throws IOException, InterruptedException {
+    HiveRecord record = new HiveApiRecord(tableSchema.numColumns());
+    vertexToHive.fillRecord(vertex, record);
+    hiveRecordWriter.write(NullWritable.get(), record);
+  }
+
+  @Override
+  public void close(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    hiveRecordWriter.close(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
new file mode 100644
index 0000000..a72cf0b
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+import com.facebook.giraph.hive.HiveWritableRecord;
+
+/**
+ * Interface for writing vertices to a Hive record.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public interface VertexToHive<I extends WritableComparable, V extends Writable,
+    E extends Writable> extends HiveTableSchemaAware {
+  /**
+   * Fill the HiveRecord from the Vertex given.
+   *
+   * @param vertex Vertex to read from.
+   * @param record HiveRecord to write to.
+   */
+  void fillRecord(Vertex<I, V, E, ?> vertex, HiveWritableRecord record);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
new file mode 100644
index 0000000..65d87e3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive output things.
+ */
+package org.apache.giraph.hive.output;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
new file mode 100644
index 0000000..d828d2a
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph configuration related things.
+ */
+package org.apache.giraph.hive;

http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c075762..4be0bd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,9 @@ under the License.
     <buildtype>test</buildtype>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <hbase.version>0.90.5</hbase.version>
-    <jackson.version>1.8.0</jackson.version>
+    <hive.version>0.9.0</hive.version>
+    <codehaus-jackson.version>1.8.0</codehaus-jackson.version>
+    <fasterxml-jackson.version>2.1.0</fasterxml-jackson.version>
     <slf4j.version>1.7.2</slf4j.version>
     <hive.version>0.10.0</hive.version>
     <forHadoop>for-hadoop-${hadoop.version}</forHadoop>
@@ -836,6 +838,11 @@ under the License.
     <dependencies>
       <!-- compile dependencies. sorted lexicographically. -->
       <dependency>
+        <groupId>com.facebook.giraph.hive</groupId>
+        <artifactId>hive-io-experimental</artifactId>
+        <version>0.1</version>
+      </dependency>
+      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>12.0</version>
@@ -843,7 +850,7 @@ under the License.
       <dependency>
         <groupId>com.yammer.metrics</groupId>
         <artifactId>metrics-core</artifactId>
-        <version>2.1.2</version>
+        <version>2.2.0</version>
         <exclusions>
           <exclusion>
             <groupId>org.slf4j</groupId>
@@ -852,6 +859,21 @@ under the License.
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${fasterxml-jackson.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${fasterxml-jackson.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.github.spullara.cli-parser</groupId>
+        <artifactId>cli-parser</artifactId>
+        <version>1.1</version>
+      </dependency>
+      <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
         <version>1.3.2</version>
@@ -877,6 +899,16 @@ under the License.
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.giraph</groupId>
+        <artifactId>giraph-hcatalog</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.giraph</groupId>
+        <artifactId>giraph-hive</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hcatalog</groupId>
         <artifactId>hcatalog-core</artifactId>
         <version>0.5.0-incubating</version>
@@ -929,12 +961,12 @@ under the License.
       <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
-        <version>${jackson.version}</version>
+        <version>${codehaus-jackson.version}</version>
       </dependency>
       <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-mapper-asl</artifactId>
-        <version>${jackson.version}</version>
+        <version>${codehaus-jackson.version}</version>
       </dependency>
       <dependency>
         <groupId>org.json</groupId>
@@ -1030,6 +1062,12 @@ under the License.
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>org.apache.giraph</groupId>
+        <artifactId>giraph-core</artifactId>
+        <type>test-jar</type>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase</artifactId>
         <type>test-jar</type>
@@ -1047,6 +1085,7 @@ under the License.
 
   <modules>
     <module>giraph-core</module>
+    <module>giraph-hive</module>
     <module>giraph-examples</module>
   </modules>