You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/02 20:04:01 UTC

[1/4] GIRAPH-458: split formats module into accumulo, hbase, hcatalog (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
new file mode 100644
index 0000000..2e91cba
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatUtils;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides functionality similar to
+ * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ */
+public class GiraphHCatInputFormat extends HCatBaseInputFormat {
+  /** Vertex input job info for HCatalog. */
+  public static final String VERTEX_INPUT_JOB_INFO =
+      "giraph.hcat.vertex.input.job.info";
+  /** Edge input job info for HCatalog. */
+  public static final String EDGE_INPUT_JOB_INFO =
+      "giraph.hcat.edge.input.job.info";
+
+  /**
+   * Set vertex {@link InputJobInfo}.
+   *
+   * @param job The job
+   * @param inputJobInfo Vertex input job info
+   * @throws IOException
+   */
+  public static void setVertexInput(Job job,
+                                    InputJobInfo inputJobInfo)
+    throws IOException {
+    InputJobInfo vertexInputJobInfo = InputJobInfo.create(
+        inputJobInfo.getDatabaseName(),
+        inputJobInfo.getTableName(),
+        inputJobInfo.getFilter());
+    vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+    Configuration conf = job.getConfiguration();
+    conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
+        HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
+  }
+
+  /**
+   * Set edge {@link InputJobInfo}.
+   *
+   * @param job The job
+   * @param inputJobInfo Edge input job info
+   * @throws IOException
+   */
+  public static void setEdgeInput(Job job,
+                                  InputJobInfo inputJobInfo)
+    throws IOException {
+    InputJobInfo edgeInputJobInfo = InputJobInfo.create(
+        inputJobInfo.getDatabaseName(),
+        inputJobInfo.getTableName(),
+        inputJobInfo.getFilter());
+    edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+    Configuration conf = job.getConfiguration();
+    conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
+        HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
+  }
+
+  /**
+   * Get table schema from input job info.
+   *
+   * @param inputJobInfo Input job info
+   * @return Input table schema
+   * @throws IOException
+   */
+  private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
+    throws IOException {
+    HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+    for (HCatFieldSchema field :
+        inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+      allCols.append(field);
+    }
+    for (HCatFieldSchema field :
+        inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+      allCols.append(field);
+    }
+    return allCols;
+  }
+
+  /**
+   * Get vertex input table schema.
+   *
+   * @param conf Job configuration
+   * @return Vertex input table schema
+   * @throws IOException
+   */
+  public static HCatSchema getVertexTableSchema(Configuration conf)
+    throws IOException {
+    return getTableSchema(getVertexJobInfo(conf));
+  }
+
+  /**
+   * Get edge input table schema.
+   *
+   * @param conf Job configuration
+   * @return Edge input table schema
+   * @throws IOException
+   */
+  public static HCatSchema getEdgeTableSchema(Configuration conf)
+    throws IOException {
+    return getTableSchema(getEdgeJobInfo(conf));
+  }
+
+  /**
+   * Set input path for job.
+   *
+   * @param jobConf Job configuration
+   * @param location Location of input files
+   * @throws IOException
+   */
+  private void setInputPath(JobConf jobConf, String location)
+    throws IOException {
+    int length = location.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+
+    for (int i = 0; i < length; i++) {
+      char ch = location.charAt(i);
+      switch (ch) {
+      case '{':
+        curlyOpen++;
+        if (!globPattern) {
+          globPattern = true;
+        }
+        break;
+      case '}':
+        curlyOpen--;
+        if (curlyOpen == 0 && globPattern) {
+          globPattern = false;
+        }
+        break;
+      case ',':
+        if (!globPattern) {
+          pathStrings.add(location.substring(pathStart, i));
+          pathStart = i + 1;
+        }
+        break;
+      default:
+      }
+    }
+    pathStrings.add(location.substring(pathStart, length));
+
+    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+
+    FileSystem fs = FileSystem.get(jobConf);
+    Path path = paths[0].makeQualified(fs);
+    StringBuilder str = new StringBuilder(StringUtils.escapeString(
+        path.toString()));
+    for (int i = 1; i < paths.length; i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = paths[i].makeQualified(fs);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+
+    jobConf.set("mapred.input.dir", str.toString());
+  }
+
+  /**
+   * Get input splits for job.
+   *
+   * @param jobContext Job context
+   * @param inputJobInfo Input job info
+   * @return MapReduce setting for file input directory
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private List<InputSplit> getSplits(JobContext jobContext,
+                                     InputJobInfo inputJobInfo)
+    throws IOException, InterruptedException {
+    Configuration conf = jobContext.getConfiguration();
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+    if (partitionInfoList == null) {
+      //No partitions match the specified partition filter
+      return splits;
+    }
+
+    HCatStorageHandler storageHandler;
+    JobConf jobConf;
+    //For each matching partition, call getSplits on the underlying InputFormat
+    for (PartInfo partitionInfo : partitionInfoList) {
+      jobConf = HCatUtil.getJobConfFromContext(jobContext);
+      setInputPath(jobConf, partitionInfo.getLocation());
+      Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+      for (HCatFieldSchema field :
+          inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+        allCols.append(field);
+      }
+      for (HCatFieldSchema field :
+          inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+        allCols.append(field);
+      }
+
+      HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+      storageHandler = HCatUtil.getStorageHandler(
+          jobConf, partitionInfo);
+
+      //Get the input format
+      Class inputFormatClass = storageHandler.getInputFormatClass();
+      org.apache.hadoop.mapred.InputFormat inputFormat =
+          getMapRedInputFormat(jobConf, inputFormatClass);
+
+      //Call getSplit on the InputFormat, create an HCatSplit for each
+      //underlying split. When the desired number of input splits is missing,
+      //use a default number (denoted by zero).
+      //TODO: Currently each partition is split independently into
+      //a desired number. However, we want the union of all partitions to be
+      //split into a desired number while maintaining balanced sizes of input
+      //splits.
+      int desiredNumSplits =
+          conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
+      org.apache.hadoop.mapred.InputSplit[] baseSplits =
+          inputFormat.getSplits(jobConf, desiredNumSplits);
+
+      for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
+        splits.add(new HCatSplit(partitionInfo, split, allCols));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Get vertex {@link InputJobInfo}.
+   *
+   * @param conf Configuration
+   * @return Vertex input job info
+   * @throws IOException
+   */
+  private static InputJobInfo getVertexJobInfo(Configuration conf)
+    throws IOException {
+    String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
+    if (jobString == null) {
+      throw new IOException("Vertex job information not found in JobContext." +
+          " GiraphHCatInputFormat.setVertexInput() not called?");
+    }
+    return (InputJobInfo) HCatUtil.deserialize(jobString);
+  }
+
+  /**
+   * Get edge {@link InputJobInfo}.
+   *
+   * @param conf Configuration
+   * @return Edge input job info
+   * @throws IOException
+   */
+  private static InputJobInfo getEdgeJobInfo(Configuration conf)
+    throws IOException {
+    String jobString = conf.get(EDGE_INPUT_JOB_INFO);
+    if (jobString == null) {
+      throw new IOException("Edge job information not found in JobContext." +
+          " GiraphHCatInputFormat.setEdgeInput() not called?");
+    }
+    return (InputJobInfo) HCatUtil.deserialize(jobString);
+  }
+
+  /**
+   * Get vertex input splits.
+   *
+   * @param jobContext Job context
+   * @return List of vertex {@link InputSplit}s
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getVertexSplits(JobContext jobContext)
+    throws IOException, InterruptedException {
+    return getSplits(jobContext,
+        getVertexJobInfo(jobContext.getConfiguration()));
+  }
+
+  /**
+   * Get edge input splits.
+   *
+   * @param jobContext Job context
+   * @return List of edge {@link InputSplit}s
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getEdgeSplits(JobContext jobContext)
+    throws IOException, InterruptedException {
+    return getSplits(jobContext,
+        getEdgeJobInfo(jobContext.getConfiguration()));
+  }
+
+  /**
+   * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
+   *
+   * @param split Input split
+   * @param schema Table schema
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private RecordReader<WritableComparable, HCatRecord>
+  createRecordReader(InputSplit split,
+                     HCatSchema schema,
+                     TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
+    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+    JobContext jobContext = taskContext;
+    Configuration conf = jobContext.getConfiguration();
+
+    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
+        conf, partitionInfo);
+
+    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
+    Map<String, String> jobProperties = partitionInfo.getJobProperties();
+    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+    Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
+        schema, partitionInfo);
+
+    return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
+  }
+
+  /**
+   * Create a {@link RecordReader} for vertices.
+   *
+   * @param split Input split
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public RecordReader<WritableComparable, HCatRecord>
+  createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    return createRecordReader(split, getVertexTableSchema(
+        taskContext.getConfiguration()), taskContext);
+  }
+
+  /**
+   * Create a {@link RecordReader} for edges.
+   *
+   * @param split Input split
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public RecordReader<WritableComparable, HCatRecord>
+  createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    return createRecordReader(split, getEdgeTableSchema(
+        taskContext.getConfiguration()), taskContext);
+  }
+
+  /**
+   * Get values for fields requested by output schema which will not be in the
+   * data.
+   *
+   * @param outputSchema Output schema
+   * @param partInfo Partition info
+   * @return Values not in data columns
+   */
+  private static Map<String, String> getColValsNotInDataColumns(
+      HCatSchema outputSchema,
+      PartInfo partInfo) {
+    HCatSchema dataSchema = partInfo.getPartitionSchema();
+    Map<String, String> vals = new HashMap<String, String>();
+    for (String fieldName : outputSchema.getFieldNames()) {
+      if (dataSchema.getPosition(fieldName) == null) {
+        // this entry of output is not present in the output schema
+        // so, we first check the table schema to see if it is a part col
+        if (partInfo.getPartitionValues().containsKey(fieldName)) {
+          vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
+        } else {
+          vals.put(fieldName, null);
+        }
+      }
+    }
+    return vals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
new file mode 100644
index 0000000..2112df3
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public abstract class HCatalogEdgeInputFormat<
+    I extends WritableComparable,
+    E extends Writable>
+    extends EdgeInputFormat<I, E> {
+  /**
+   * HCatalog input format.
+   */
+  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+  @Override
+  public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getEdgeSplits(context);
+  }
+
+  /**
+   * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
+   */
+  protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
+    /** Internal {@link RecordReader}. */
+    private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    @Override
+    public final void initialize(InputSplit inputSplit,
+                                 TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      hCatRecordReader =
+          hCatInputFormat.createEdgeRecordReader(inputSplit, context);
+      hCatRecordReader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    @Override
+    public boolean nextEdge() throws IOException, InterruptedException {
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+     * Get the record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Create {@link EdgeReader}.
+
+   * @return {@link HCatalogEdgeReader} instance.
+   */
+  protected abstract HCatalogEdgeReader createEdgeReader();
+
+  @Override
+  public EdgeReader<I, E>
+  createEdgeReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogEdgeReader reader = createEdgeReader();
+      reader.initialize(split, context);
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createEdgeReader: Interrupted creating reader.", e);
+    }
+  }
+
+  /**
+   * {@link HCatalogEdgeReader} for tables holding a complete edge
+   * in each row.
+   */
+  protected abstract class SingleRowHCatalogEdgeReader
+      extends HCatalogEdgeReader {
+    /**
+     * Get source vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Source vertex id
+     */
+    protected abstract I getSourceVertexId(HCatRecord record);
+
+    /**
+     * Get target vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Target vertex id
+     */
+    protected abstract I getTargetVertexId(HCatRecord record);
+
+    /**
+     * Get edge value from a record.
+     *
+     * @param record Input record
+     * @return E Edge value
+     */
+    protected abstract E getEdgeValue(HCatRecord record);
+
+    @Override
+    public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      return new EdgeWithSource<I, E>(
+          getSourceVertexId(record),
+          new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
new file mode 100644
index 0000000..ec49137
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.utils.TimedLogger;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to load data from a Hive or Pig
+ * table. You can easily implement a {@link HCatalogVertexReader} by extending
+ * either {@link SingleRowHCatalogVertexReader} or
+ * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
+ * stored in the input table.
+ * <p>
+ * The desired database and table name to load from can be specified via
+ * {@link GiraphHCatInputFormat#setVertexInput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * as you setup your vertex input format with
+ * {@link org.apache.giraph.conf.GiraphConfiguration#
+ * setVertexInputFormatClass(Class)}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<
+    I extends WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /**
+   * HCatalog input format.
+   */
+  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+  @Override
+  public final List<InputSplit> getSplits(
+      final JobContext context, final int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getVertexSplits(context);
+  }
+
+  /**
+   * Abstract class that users should subclass
+   * based on their specific vertex
+   * input. HCatRecord can be parsed to get the
+   * required data for implementing
+   * getCurrentVertex(). If the vertex spans more
+   * than one HCatRecord,
+   * nextVertex() should be overwritten to handle that logic as well.
+   */
+  protected abstract class HCatalogVertexReader implements
+      VertexReader<I, V, E, M> {
+    /** Giraph configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+    /** Internal HCatRecordReader. */
+    private RecordReader<WritableComparable,
+        HCatRecord> hCatRecordReader;
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+      return configuration;
+    }
+
+    /**
+     * Initialize with the HCatRecordReader.
+     *
+     * @param recordReader internal reader
+     */
+    private void initialize(
+        final RecordReader<
+            WritableComparable, HCatRecord>
+            recordReader) {
+      this.hCatRecordReader = recordReader;
+    }
+
+    @Override
+    public final void initialize(
+        final InputSplit inputSplit,
+        final TaskAttemptContext ctxt)
+      throws IOException, InterruptedException {
+      hCatRecordReader.initialize(inputSplit, ctxt);
+      this.context = ctxt;
+      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      // Users can override this if desired,
+      // and a vertex is bigger than
+      // a single row.
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+     * Get the record reader.
+     * @return Record reader to be used for reading.
+     */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     *
+     *
+     * @return Context passed to initialize.
+     */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * create vertex reader instance.
+   * @return HCatalogVertexReader
+   */
+  protected abstract HCatalogVertexReader createVertexReader();
+
+  @Override
+  public final VertexReader<I, V, E, M>
+  createVertexReader(final InputSplit split,
+                     final TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogVertexReader reader = createVertexReader();
+      reader.initialize(hCatInputFormat.
+          createVertexRecordReader(split, context));
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createVertexReader: " +
+              "Interrupted creating reader.", e);
+    }
+  }
+
+  /**
+   * HCatalogVertexReader for tables holding
+   * complete vertex info within each
+   * row.
+   */
+  protected abstract class SingleRowHCatalogVertexReader
+      extends HCatalogVertexReader {
+    /**
+     * 1024 const.
+     */
+    private static final int BYTE_CONST = 1024;
+    /**
+     *  logger
+     */
+    private final Logger log =
+        Logger.getLogger(SingleRowHCatalogVertexReader.class);
+    /**
+     * record count.
+     */
+    private int recordCount = 0;
+    /**
+     * modulus check counter.
+     */
+    private final int recordModLimit = 1000;
+    /**
+     * Timed logger to print every 30 seconds
+     */
+    private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
+        log);
+
+    /**
+     * get vertex id.
+     * @param record hcat record
+     * @return I id
+     */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+     * get vertex value.
+     * @param record hcat record
+     * @return V value
+     */
+    protected abstract V getVertexValue(HCatRecord record);
+
+    /**
+     * get edges.
+     * @param record hcat record
+     * @return Edges
+     */
+    protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex = getConfiguration().createVertex();
+      vertex.initialize(getVertexId(record), getVertexValue(record),
+          getEdges(record));
+      ++recordCount;
+      if (log.isInfoEnabled() &&
+          ((recordCount % recordModLimit) == 0)) {
+        // memory usage
+        Runtime runtime = Runtime.getRuntime();
+        double gb = BYTE_CONST *
+            BYTE_CONST *
+            BYTE_CONST;
+        timedLogger.info(
+            "read " + recordCount + " records. Memory: " +
+            (runtime.totalMemory() / gb) +
+            "GB total = " +
+            ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
+            "GB used + " + (runtime.freeMemory() / gb) +
+            "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+      }
+      return vertex;
+    }
+  }
+  /**
+   * HCatalogVertexReader for tables
+   * holding vertex info across multiple rows
+   * sorted by vertex id column,
+   * so that they appear consecutively to the
+   * RecordReader.
+   */
+  protected abstract class MultiRowHCatalogVertexReader extends
+      HCatalogVertexReader {
+    /**
+     * modulus check counter.
+     */
+    private static final int RECORD_MOD_LIMIT = 1000;
+    /**
+     *  logger
+     */
+    private final Logger log =
+        Logger.getLogger(MultiRowHCatalogVertexReader.class);
+    /**
+     * current vertex id.
+     */
+    private I currentVertexId = null;
+    /**
+     * current vertex edges.
+     */
+    private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
+    /**
+     * record for vertex.
+     */
+    private List<HCatRecord> recordsForVertex = Lists.newArrayList();
+    /**
+     * record count.
+     */
+    private int recordCount = 0;
+    /**
+     * vertex.
+     */
+    private Vertex<I, V, E, M> vertex = null;
+    /**
+     * Timed logger to print every 30 seconds
+     */
+    private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
+        log);
+
+
+    /**
+     * get vertex id from record.
+     *
+     * @param record hcat
+     * @return I vertex id
+     */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+     * get vertex value from record.
+     * @param records all vertex values
+     * @return V iterable of record values
+     */
+    protected abstract V getVertexValue(
+        Iterable<HCatRecord> records);
+
+    /**
+     * get target vertex id from record.
+     *
+     * @param record hcat
+     * @return I vertex id of target.
+     */
+    protected abstract I getTargetVertexId(HCatRecord record);
+
+    /**
+     * get edge value from record.
+     *
+     * @param record hcat.
+     * @return E edge value.
+     */
+    protected abstract E getEdgeValue(HCatRecord record);
+
+    @Override
+    public final Vertex<I, V, E, M>
+    getCurrentVertex() throws IOException, InterruptedException {
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      while (getRecordReader().nextKeyValue()) {
+        HCatRecord record =
+            getRecordReader().getCurrentValue();
+        if (currentVertexId == null) {
+          currentVertexId = getVertexId(record);
+        }
+        if (currentVertexId.equals(getVertexId(record))) {
+          currentEdges.add(new Edge<I, E>(
+                  getTargetVertexId(record),
+                  getEdgeValue(record)));
+          recordsForVertex.add(record);
+        } else {
+          createCurrentVertex();
+          if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
+            timedLogger.info("read " + recordCount);
+          }
+          currentVertexId = getVertexId(record);
+          recordsForVertex.add(record);
+          return true;
+        }
+      }
+
+      if (currentEdges.isEmpty()) {
+        return false;
+      } else {
+        createCurrentVertex();
+        return true;
+      }
+    }
+
+    /**
+     * create current vertex.
+     */
+    private void createCurrentVertex() {
+      vertex = getConfiguration().createVertex();
+      vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
+          currentEdges);
+      currentEdges.clear();
+      recordsForVertex.clear();
+      ++recordCount;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
new file mode 100644
index 0000000..94c7b85
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Abstract class that users should subclass to store data to Hive or Pig table.
+ * You can easily implement a {@link HCatalogVertexWriter} by extending
+ * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
+ * depending on how you want to fit your vertices into the output table.
+ * <p>
+ * The desired database and table name to store to can be specified via
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * as you setup your vertex output format with
+ * {@link org.apache.giraph.conf.GiraphConfiguration}
+ * setVertexOutputFormatClass(Class)}. You must create the output table.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+  /**
+  * hcat output format
+  */
+  protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+  @Override
+  public final void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    hCatOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return hCatOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+  * Abstract class that users should
+  * subclass based on their specific vertex
+  * output. Users should implement
+  * writeVertex to create a HCatRecord that is
+  * valid to for writing by HCatalogRecordWriter.
+  */
+  protected abstract class HCatalogVertexWriter implements
+            VertexWriter<I, V, E> {
+
+    /** Internal HCatRecordWriter */
+    private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    /**
+    * Initialize with the HCatRecordWriter
+    * @param hCatRecordWriter
+    *            Internal writer
+    */
+    private void initialize(
+                    RecordWriter<WritableComparable<?>,
+                    HCatRecord> hCatRecordWriter) {
+      this.hCatRecordWriter = hCatRecordWriter;
+    }
+
+    /**
+    * Get the record reader.
+    * @return Record reader to be used for reading.
+    */
+    protected RecordWriter<WritableComparable<?>,
+            HCatRecord> getRecordWriter() {
+      return hCatRecordWriter;
+    }
+
+    /**
+    * Get the context.
+    *
+    * @return Context passed to initialize.
+    */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      hCatRecordWriter.close(context);
+    }
+
+  }
+
+  /**
+  * create vertex writer.
+  * @return HCatalogVertexWriter
+  */
+  protected abstract HCatalogVertexWriter createVertexWriter();
+
+  @Override
+  public final VertexWriter<I, V, E> createVertexWriter(
+    TaskAttemptContext context) throws IOException,
+    InterruptedException {
+    HCatalogVertexWriter writer = createVertexWriter();
+    writer.initialize(hCatOutputFormat.getRecordWriter(context));
+    return writer;
+  }
+
+  /**
+  * HCatalogVertexWriter to write each vertex in each row.
+  */
+  protected abstract class SingleRowHCatalogVertexWriter extends
+            HCatalogVertexWriter {
+    /**
+    * get num columns
+    * @return intcolumns
+    */
+    protected abstract int getNumColumns();
+
+    /**
+    * fill record
+    * @param record to fill
+    * @param vertex to populate record
+    */
+    protected abstract void fillRecord(HCatRecord record,
+                                    Vertex<I, V, E, ?> vertex);
+
+    /**
+    * create record
+    * @param vertex to populate record
+    * @return HCatRecord newly created
+    */
+    protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+      HCatRecord record = new DefaultHCatRecord(getNumColumns());
+      fillRecord(record, vertex);
+      return record;
+    }
+
+    @Override
+    // XXX It is important not to put generic type signature <I,V,E,?> after
+    // Vertex. Otherwise, any class that extends this will not compile
+    // because of not implementing the VertexWriter#writeVertex. Mystery of
+    // Java Generics :(
+    @SuppressWarnings("unchecked")
+    public final void writeVertex(Vertex vertex) throws IOException,
+        InterruptedException {
+      getRecordWriter().write(null, createRecord(vertex));
+    }
+
+  }
+
+  /**
+  * HCatalogVertexWriter to write each vertex in multiple rows.
+  */
+  public abstract class MultiRowHCatalogVertexWriter extends
+    HCatalogVertexWriter {
+    /**
+    * create records
+    * @param vertex to populate records
+    * @return Iterable of records
+    */
+    protected abstract Iterable<HCatRecord> createRecords(
+        Vertex<I, V, E, ?> vertex);
+
+    @Override
+    // XXX Same thing here. No Generics for Vertex here.
+    @SuppressWarnings("unchecked")
+    public final void writeVertex(Vertex vertex) throws IOException,
+        InterruptedException {
+      Iterable<HCatRecord> records = createRecords(vertex);
+      for (HCatRecord record : records) {
+        getRecordWriter().write(null, record);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
new file mode 100644
index 0000000..d08179d
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.VertexValueInputFormat;
+import org.apache.giraph.graph.VertexValueReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link VertexValueInputFormat} for reading vertex values from
+ * Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class HCatalogVertexValueInputFormat<I extends
+    WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable>
+    extends VertexValueInputFormat<I, V, E, M> {
+  /**
+   * HCatalog input format.
+   */
+  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getVertexSplits(context);
+  }
+
+  /**
+   * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
+   */
+  protected abstract class HCatalogVertexValueReader
+      extends VertexValueReader<I, V, E, M> {
+    /** Internal {@link RecordReader}. */
+    private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    @Override
+    public final void initialize(InputSplit inputSplit,
+                                 TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      hCatRecordReader =
+          hCatInputFormat.createVertexRecordReader(inputSplit, context);
+      hCatRecordReader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+     * Get the record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Create {@link VertexValueReader}.
+
+   * @return {@link HCatalogVertexValueReader} instance.
+   */
+  protected abstract HCatalogVertexValueReader createVertexValueReader();
+
+  @Override
+  public final VertexValueReader<I, V, E, M>
+  createVertexValueReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogVertexValueReader reader = createVertexValueReader();
+      reader.initialize(split, context);
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createVertexValueReader: Interrupted creating reader.", e);
+    }
+  }
+
+  /**
+   * {@link HCatalogVertexValueReader} for tables holding a complete vertex
+   * value in each row.
+   */
+  protected abstract class SingleRowHCatalogVertexValueReader
+      extends HCatalogVertexValueReader {
+    /**
+     * Get vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Vertex id
+     */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+     * Get vertex value from a record.
+     *
+     * @param record Input record
+     * @return V Vertex value
+     */
+    protected abstract V getVertexValue(HCatRecord record);
+
+    @Override
+    public final I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getVertexId(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getVertexValue(getRecordReader().getCurrentValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
new file mode 100644
index 0000000..7a7c2f8
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hive Giraph Runner
+ */
+public class HiveGiraphRunner implements Tool {
+  /**
+   * logger
+   */
+  private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
+  /**
+   * workers
+   */
+  protected int workers;
+  /**
+   * is verbose
+   */
+  protected boolean isVerbose;
+  /**
+   * output table partitions
+   */
+  protected Map<String, String> outputTablePartitionValues;
+  /**
+   * dbName
+   */
+  protected String dbName;
+  /**
+   * vertex input table name
+   */
+  protected String vertexInputTableName;
+  /**
+   * vertex input table filter
+   */
+  protected String vertexInputTableFilterExpr;
+  /**
+   * edge input table name
+   */
+  protected String edgeInputTableName;
+  /**
+   * edge input table filter
+   */
+  protected String edgeInputTableFilterExpr;
+  /**
+   * output table name
+   */
+  protected String outputTableName;
+  /** Configuration */
+  private Configuration conf;
+  /** Skip output? (Useful for testing without writing) */
+  private boolean skipOutput = false;
+
+  /**
+  * vertex class.
+  */
+  private Class<? extends Vertex> vertexClass;
+  /**
+   * vertex input format internal.
+   */
+  private Class<? extends VertexInputFormat> vertexInputFormatClass;
+  /**
+   * edge input format internal.
+   */
+  private Class<? extends EdgeInputFormat> edgeInputFormatClass;
+  /**
+  * vertex output format internal.
+  */
+  private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
+
+  /**
+  * Giraph runner class.
+   *
+  * @param vertexClass Vertex class
+  * @param vertexInputFormatClass Vertex input format
+  * @param edgeInputFormatClass Edge input format
+  * @param vertexOutputFormatClass Output format
+  */
+  protected HiveGiraphRunner(
+      Class<? extends Vertex> vertexClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends EdgeInputFormat> edgeInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+    this.vertexClass = vertexClass;
+    this.vertexInputFormatClass = vertexInputFormatClass;
+    this.edgeInputFormatClass = edgeInputFormatClass;
+    this.vertexOutputFormatClass = vertexOutputFormatClass;
+    this.conf = new HiveConf(getClass());
+  }
+
+  /**
+  * main method
+  * @param args system arguments
+  * @throws Exception any errors from Hive Giraph Runner
+  */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(
+        new HiveGiraphRunner(null, null, null, null), args));
+  }
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    // process args
+    try {
+      processArguments(args);
+    } catch (InterruptedException e) {
+      return 0;
+    } catch (IllegalArgumentException e) {
+      System.err.println(e.getMessage());
+      return -1;
+    }
+
+    // additional configuration for Hive
+    adjustConfigurationForHive(getConf());
+
+    // setup GiraphJob
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.getConfiguration().setVertexClass(vertexClass);
+
+    // setup input from Hive
+    if (vertexInputFormatClass != null) {
+      InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
+          vertexInputTableName, vertexInputTableFilterExpr);
+      GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
+          vertexInputJobInfo);
+      job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+    }
+    if (edgeInputFormatClass != null) {
+      InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
+          edgeInputTableName, edgeInputTableFilterExpr);
+      GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
+          edgeInputJobInfo);
+      job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
+    }
+
+    // setup output to Hive
+    HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+        dbName, outputTableName, outputTablePartitionValues));
+    HCatOutputFormat.setSchema(job.getInternalJob(),
+        HCatOutputFormat.getTableSchema(job.getInternalJob()));
+    if (skipOutput) {
+      LOG.warn("run: Warning - Output will be skipped!");
+    } else {
+      job.getConfiguration().setVertexOutputFormatClass(
+          vertexOutputFormatClass);
+    }
+
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+    initGiraphJob(job);
+
+    return job.run(isVerbose) ? 0 : -1;
+  }
+
+  /**
+  * set hive configuration
+  * @param conf Configuration argument
+  */
+  private static void adjustConfigurationForHive(Configuration conf) {
+    // when output partitions are used, workers register them to the
+    // metastore at cleanup stage, and on HiveConf's initialization, it
+    // looks for hive-site.xml from.
+    addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+        .getResource("hive-site.xml").toString());
+
+    // Also, you need hive.aux.jars as well
+    // addToStringCollection(conf, "tmpjars",
+    // conf.getStringCollection("hive.aux.jars.path"));
+
+    // Or, more effectively, we can provide all the jars client needed to
+    // the workers as well
+    String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+        File.pathSeparator);
+    List<String> hadoopJarURLs = Lists.newArrayList();
+    for (String jarPath : hadoopJars) {
+      File file = new File(jarPath);
+      if (file.exists() && file.isFile()) {
+        String jarURL = file.toURI().toString();
+        hadoopJarURLs.add(jarURL);
+      }
+    }
+    addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+  }
+
+  /**
+  * process arguments
+  * @param args to process
+  * @return CommandLine instance
+  * @throws ParseException error parsing arguments
+  * @throws InterruptedException interrupted
+  */
+  private CommandLine processArguments(String[] args) throws ParseException,
+            InterruptedException {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("D", "hiveconf", true,
+                "property=value for Hive/Hadoop configuration");
+    options.addOption("w", "workers", true, "Number of workers");
+    if (vertexClass == null) {
+      options.addOption(null, "vertexClass", true,
+          "Giraph Vertex class to use");
+    }
+    if (vertexInputFormatClass == null) {
+      options.addOption(null, "vertexInputFormatClass", true,
+          "Giraph HCatalogVertexInputFormat class to use");
+    }
+    if (edgeInputFormatClass == null) {
+      options.addOption(null, "edgeInputFormatClass", true,
+          "Giraph HCatalogEdgeInputFormat class to use");
+    }
+
+    if (vertexOutputFormatClass == null) {
+      options.addOption(null, "vertexOutputFormatClass", true,
+          "Giraph HCatalogVertexOutputFormat class to use");
+    }
+
+    options.addOption("db", "database", true, "Hive database name");
+    options.addOption("vi", "vertexInputTable", true,
+        "Vertex input table name");
+    options.addOption("VI", "vertexInputFilter", true,
+        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("ei", "edgeInputTable", true,
+        "Edge input table name");
+    options.addOption("EI", "edgeInputFilter", true,
+        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("o", "outputTable", true, "Output table name");
+    options.addOption("O", "outputPartition", true,
+        "Output table partition values (e.g., \"a=1,b=two\")");
+    options.addOption("s", "skipOutput", false, "Skip output?");
+
+    addMoreOptions(options);
+
+    CommandLineParser parser = new GnuParser();
+    final CommandLine cmdln = parser.parse(options, args);
+    if (args.length == 0 || cmdln.hasOption("help")) {
+      new HelpFormatter().printHelp(getClass().getName(), options, true);
+      throw new InterruptedException();
+    }
+
+    // Giraph classes
+    if (cmdln.hasOption("vertexClass")) {
+      vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+          Vertex.class);
+    }
+    if (cmdln.hasOption("vertexInputFormatClass")) {
+      vertexInputFormatClass = findClass(
+          cmdln.getOptionValue("vertexInputFormatClass"),
+          HCatalogVertexInputFormat.class);
+    }
+    if (cmdln.hasOption("edgeInputFormatClass")) {
+      edgeInputFormatClass = findClass(
+          cmdln.getOptionValue("edgeInputFormatClass"),
+          HCatalogEdgeInputFormat.class);
+    }
+
+    if (cmdln.hasOption("vertexOutputFormatClass")) {
+      vertexOutputFormatClass = findClass(
+          cmdln.getOptionValue("vertexOutputFormatClass"),
+          HCatalogVertexOutputFormat.class);
+    }
+
+    if (cmdln.hasOption("skipOutput")) {
+      skipOutput = true;
+    }
+
+    if (vertexClass == null) {
+      throw new IllegalArgumentException(
+          "Need the Giraph Vertex class name (-vertexClass) to use");
+    }
+    if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
+      throw new IllegalArgumentException(
+          "Need at least one of Giraph VertexInputFormat " +
+              "class name (-vertexInputFormatClass) and " +
+              "EdgeInputFormat class name (-edgeInputFormatClass)");
+    }
+    if (vertexOutputFormatClass == null) {
+      throw new IllegalArgumentException(
+          "Need the Giraph VertexOutputFormat " +
+              "class name (-vertexOutputFormatClass) to use");
+    }
+    if (!cmdln.hasOption("workers")) {
+      throw new IllegalArgumentException(
+          "Need to choose the number of workers (-w)");
+    }
+    if (!cmdln.hasOption("vertexInputTable") &&
+        vertexInputFormatClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the vertex input table name (-vi)");
+    }
+    if (!cmdln.hasOption("edgeInputTable") &&
+        edgeInputFormatClass != null) {
+      throw new IllegalArgumentException(
+          "Need to set the edge input table name (-ei)");
+    }
+    if (!cmdln.hasOption("outputTable")) {
+      throw new IllegalArgumentException(
+          "Need to set the output table name (-o)");
+    }
+    dbName = cmdln.getOptionValue("dbName", "default");
+    vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
+    vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
+    edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
+    edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
+    outputTableName = cmdln.getOptionValue("outputTable");
+    outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
+                .getOptionValue("outputPartition"));
+    workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+    isVerbose = cmdln.hasOption("verbose");
+
+    // pick up -hiveconf arguments
+    for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+      String[] keyval = hiveconf.split("=", 2);
+      if (keyval.length == 2) {
+        String name = keyval[0];
+        String value = keyval[1];
+        if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+          addToStringCollection(
+                  conf, name, value);
+        } else {
+          conf.set(name, value);
+        }
+      }
+    }
+
+    processMoreArguments(cmdln);
+
+    return cmdln;
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(Configuration conf, String name,
+                                              String... values) {
+    addToStringCollection(conf, name, Arrays.asList(values));
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(
+          Configuration conf, String name, Collection
+          <? extends String> values) {
+    Collection<String> tmpfiles = conf.getStringCollection(name);
+    tmpfiles.addAll(values);
+    conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+  }
+
+  /**
+  *
+  * @param className to find
+  * @param base  base class
+  * @param <T> class type found
+  * @return type found
+  */
+  private <T> Class<? extends T> findClass(String className, Class<T> base) {
+    try {
+      Class<?> cls = Class.forName(className);
+      if (base.isAssignableFrom(cls)) {
+        return cls.asSubclass(base);
+      }
+      return null;
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(className + ": Invalid class name");
+    }
+  }
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+  * Override this method to add more command-line options. You can process
+  * them by also overriding {@link #processMoreArguments(CommandLine)}.
+  *
+  * @param options Options
+  */
+  protected void addMoreOptions(Options options) {
+  }
+
+  /**
+  * Override this method to process additional command-line arguments. You
+  * may want to declare additional options by also overriding
+  * {@link #addMoreOptions(Options)}.
+  *
+  * @param cmd Command
+  */
+  protected void processMoreArguments(CommandLine cmd) {
+  }
+
+  /**
+  * Override this method to do additional setup with the GiraphJob that will
+  * run.
+  *
+  * @param job
+  *            GiraphJob that is going to run
+  */
+  protected void initGiraphJob(GiraphJob job) {
+    LOG.info(getClass().getSimpleName() + " with");
+    String prefix = "\t";
+    LOG.info(prefix + "-vertexClass=" +
+         vertexClass.getCanonicalName());
+    if (vertexInputFormatClass != null) {
+      LOG.info(prefix + "-vertexInputFormatClass=" +
+          vertexInputFormatClass.getCanonicalName());
+    }
+    if (edgeInputFormatClass != null) {
+      LOG.info(prefix + "-edgeInputFormatClass=" +
+          edgeInputFormatClass.getCanonicalName());
+    }
+    LOG.info(prefix + "-vertexOutputFormatClass=" +
+        vertexOutputFormatClass.getCanonicalName());
+    if (vertexInputTableName != null) {
+      LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
+    }
+    if (vertexInputTableFilterExpr != null) {
+      LOG.info(prefix + "-vertexInputFilter=\"" +
+          vertexInputTableFilterExpr + "\"");
+    }
+    if (edgeInputTableName != null) {
+      LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
+    }
+    if (edgeInputTableFilterExpr != null) {
+      LOG.info(prefix + "-edgeInputFilter=\"" +
+          edgeInputTableFilterExpr + "\"");
+    }
+    LOG.info(prefix + "-outputTable=" + outputTableName);
+    if (outputTablePartitionValues != null) {
+      LOG.info(prefix + "-outputPartition=\"" +
+          outputTablePartitionValues + "\"");
+    }
+    LOG.info(prefix + "-workers=" + workers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
new file mode 100644
index 0000000..c1f76f1
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utilities and helpers for working with Hive tables.
+ */
+public class HiveUtils {
+  // TODO use Hive util class if this is already provided by it
+
+  /**
+   * Private constructor for helper class.
+   */
+  private HiveUtils() {
+    // Do nothing.
+  }
+
+  /**
+  * @param outputTablePartitionString table partition string
+  * @return Map
+  */
+  public static Map<String, String> parsePartitionValues(
+            String outputTablePartitionString) {
+    if (outputTablePartitionString == null) {
+      return null;
+    }
+    Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
+    Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
+    Map<String, String> partitionValues = Maps.newHashMap();
+    for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
+      List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
+      if (keyVal.size() != 2) {
+        throw new IllegalArgumentException(
+            "Unrecognized partition value format: " +
+            outputTablePartitionString);
+      }
+      partitionValues.put(keyVal.get(0), keyVal.get(1));
+    }
+    return partitionValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/package-info.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
new file mode 100644
index 0000000..b01e254
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of input and output format classes
+ * for loading and storing Hive/Pig data using HCatalog.
+ */
+package org.apache.giraph.io.hcatalog;
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java b/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
new file mode 100644
index 0000000..1f25709
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility methods copied from HCatalog because of visibility restrictions.
+ */
+public class HCatUtils {
+  /**
+   * Don't instantiate.
+   */
+  private HCatUtils() { }
+
+  /**
+   * Returns the given InputJobInfo after populating with data queried from the
+   * metadata service.
+   *
+   * @param conf Configuration
+   * @param inputJobInfo Input job info
+   * @return Populated input job info
+   * @throws IOException
+   */
+  public static InputJobInfo getInputJobInfo(
+      Configuration conf, InputJobInfo inputJobInfo)
+    throws IOException {
+    HiveMetaStoreClient client = null;
+    HiveConf hiveConf;
+    try {
+      if (conf != null) {
+        hiveConf = HCatUtil.getHiveConf(conf);
+      } else {
+        hiveConf = new HiveConf(GiraphHCatInputFormat.class);
+      }
+      client = HCatUtil.getHiveClient(hiveConf);
+      Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+          inputJobInfo.getTableName());
+
+      List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+      if (table.getPartitionKeys().size() != 0) {
+        // Partitioned table
+        List<Partition> parts = client.listPartitionsByFilter(
+            inputJobInfo.getDatabaseName(),
+            inputJobInfo.getTableName(),
+            inputJobInfo.getFilter(),
+            (short) -1);
+
+        if (parts != null) {
+          // Default to 100,000 partitions if hive.metastore.maxpartition is not
+          // defined
+          int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+          if (parts.size() > maxPart) {
+            throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
+                "total number of partitions is " + parts.size());
+          }
+
+          // Populate partition info
+          for (Partition ptn : parts) {
+            HCatSchema schema = HCatUtil.extractSchema(
+                new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+            PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+                ptn.getParameters(), conf, inputJobInfo);
+            partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
+                ptn));
+            partInfoList.add(partInfo);
+          }
+        }
+      } else {
+        // Non partitioned table
+        HCatSchema schema = HCatUtil.extractSchema(table);
+        PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+            table.getParameters(), conf, inputJobInfo);
+        partInfo.setPartitionValues(new HashMap<String, String>());
+        partInfoList.add(partInfo);
+      }
+      inputJobInfo.setPartitions(partInfoList);
+    } catch (MetaException e) {
+      throw new IOException("Got MetaException", e);
+    } catch (NoSuchObjectException e) {
+      throw new IOException("Got NoSuchObjectException", e);
+    } catch (TException e) {
+      throw new IOException("Got TException", e);
+    } catch (HiveException e) {
+      throw new IOException("Got HiveException", e);
+    } finally {
+      HCatUtil.closeHiveClientQuietly(client);
+    }
+    return inputJobInfo;
+  }
+
+  /**
+   * Extract partition info.
+   *
+   * @param schema Table schema
+   * @param sd Storage descriptor
+   * @param parameters Parameters
+   * @param conf Configuration
+   * @param inputJobInfo Input job info
+   * @return Partition info
+   * @throws IOException
+   */
+  private static PartInfo extractPartInfo(
+      HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
+      Configuration conf, InputJobInfo inputJobInfo) throws IOException {
+    StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
+
+    Properties hcatProperties = new Properties();
+    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
+        storerInfo);
+
+    // Copy the properties from storageHandler to jobProperties
+    Map<String, String> jobProperties =
+        HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+
+    for (Map.Entry<String, String> param : parameters.entrySet()) {
+      hcatProperties.put(param.getKey(), param.getValue());
+    }
+
+    return new PartInfo(schema, storageHandler, sd.getLocation(),
+        hcatProperties, jobProperties, inputJobInfo.getTableInfo());
+  }
+
+  /**
+   * Create a new {@link HCatRecordReader}.
+   *
+   * @param storageHandler Storage handler
+   * @param valuesNotInDataCols Values not in data columns
+   * @return Record reader
+   */
+  public static RecordReader newHCatReader(
+      HCatStorageHandler storageHandler,
+      Map<String, String> valuesNotInDataCols) {
+    return new HCatRecordReader(storageHandler, valuesNotInDataCols);
+  }
+
+  /**
+   * Cast an {@link InputSplit} to {@link HCatSplit}.
+   *
+   * @param split Input split
+   * @return {@link HCatSplit}
+   * @throws IOException
+   */
+  public static HCatSplit castToHCatSplit(InputSplit split)
+    throws IOException {
+    return InternalUtil.castToHCatSplit(split);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/package-info.java b/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
new file mode 100644
index 0000000..e236aaf
--- /dev/null
+++ b/giraph-hcatalog/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for HCatalog helper utilities.
+ */
+package org.apache.hcatalog.mapreduce;

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-hcatalog/src/test/java/org/apache/giraph/io/hcatalog/TestHiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/test/java/org/apache/giraph/io/hcatalog/TestHiveUtils.java b/giraph-hcatalog/src/test/java/org/apache/giraph/io/hcatalog/TestHiveUtils.java
new file mode 100644
index 0000000..421cc28
--- /dev/null
+++ b/giraph-hcatalog/src/test/java/org/apache/giraph/io/hcatalog/TestHiveUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import junit.framework.TestCase;
+
+import java.util.Map;
+import org.junit.Test;
+
+public class TestHiveUtils extends TestCase {
+  @Test
+  public void testParsePartition() {
+    String partitionStr = "feature1=2012-10-09, feature2=a1+b2, feature3=ff-gg";
+    Map<String, String> partition = HiveUtils.parsePartitionValues(partitionStr);
+    assertEquals(3, partition.size());
+    assertEquals("2012-10-09", partition.get("feature1"));
+    assertEquals("a1+b2", partition.get("feature2"));
+    assertEquals("ff-gg", partition.get("feature3"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0be32c5..9f6770e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,7 +224,6 @@ under the License.
     <lib.dir>${top.dir}/lib</lib.dir>
     <buildtype>test</buildtype>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <formats.module>giraph-formats</formats.module>
     <hbase.version>0.90.5</hbase.version>
     <jackson.version>1.8.0</jackson.version>
     <slf4j.version>1.7.2</slf4j.version>
@@ -501,7 +500,9 @@ under the License.
     <profile>
       <id>hadoop_0.20.203</id>
       <modules>
-        <module>${formats.module}</module>
+        <module>giraph-accumulo</module>
+        <module>giraph-hbase</module>
+        <module>giraph-hcatalog</module>
       </modules>
       <activation>
         <activeByDefault>true</activeByDefault>
@@ -528,7 +529,9 @@ under the License.
     <profile>
       <id>hadoop_1.0</id>
       <modules>
-        <module>${formats.module}</module>
+        <module>giraph-accumulo</module>
+        <module>giraph-hbase</module>
+        <module>giraph-hcatalog</module>
       </modules>
       <properties>
         <hadoop.version>1.0.2</hadoop.version>
@@ -552,7 +555,9 @@ under the License.
     <profile>
       <id>hadoop_non_secure</id>
       <modules>
-        <module>${formats.module}</module>
+        <module>giraph-accumulo</module>
+        <module>giraph-hbase</module>
+        <module>giraph-hcatalog</module>
       </modules>
       <properties>
         <hadoop.version>0.20.2</hadoop.version>
@@ -576,7 +581,9 @@ under the License.
     <profile>
       <id>hadoop_facebook</id>
       <modules>
-        <module>${formats.module}</module>
+        <module>giraph-accumulo</module>
+        <module>giraph-hbase</module>
+        <module>giraph-hcatalog</module>
       </modules>
       <properties>
         <hadoop.version>0.20.1</hadoop.version>