You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/20 06:59:53 UTC

svn commit: r1411554 - in /giraph/trunk/giraph-formats-contrib/src/main/java/org/apache: giraph/io/hcatalog/ hcatalog/ hcatalog/mapreduce/

Author: apresta
Date: Tue Nov 20 05:59:52 2012
New Revision: 1411554

URL: http://svn.apache.org/viewvc?rev=1411554&view=rev
Log:
Adding missing files from GIRAPH-405 (apresta)

Added:
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatUtils;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides functionality similar to
+ * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ */
+public class GiraphHCatInputFormat extends HCatBaseInputFormat {
+  /** Vertex input job info for HCatalog. */
+  public static final String VERTEX_INPUT_JOB_INFO =
+      "giraph.hcat.vertex.input.job.info";
+  /** Edge input job info for HCatalog. */
+  public static final String EDGE_INPUT_JOB_INFO =
+      "giraph.hcat.edge.input.job.info";
+
+  /**
+   * Set vertex {@link InputJobInfo}.
+   *
+   * @param job The job
+   * @param inputJobInfo Vertex input job info
+   * @throws IOException
+   */
+  public static void setVertexInput(Job job,
+                                    InputJobInfo inputJobInfo)
+    throws IOException {
+    InputJobInfo vertexInputJobInfo = InputJobInfo.create(
+        inputJobInfo.getDatabaseName(),
+        inputJobInfo.getTableName(),
+        inputJobInfo.getFilter());
+    vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+    Configuration conf = job.getConfiguration();
+    conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
+        HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
+  }
+
+  /**
+   * Set edge {@link InputJobInfo}.
+   *
+   * @param job The job
+   * @param inputJobInfo Edge input job info
+   * @throws IOException
+   */
+  public static void setEdgeInput(Job job,
+                                  InputJobInfo inputJobInfo)
+    throws IOException {
+    InputJobInfo edgeInputJobInfo = InputJobInfo.create(
+        inputJobInfo.getDatabaseName(),
+        inputJobInfo.getTableName(),
+        inputJobInfo.getFilter());
+    edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+    Configuration conf = job.getConfiguration();
+    conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
+        HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
+  }
+
+  /**
+   * Get table schema from input job info.
+   *
+   * @param inputJobInfo Input job info
+   * @return Input table schema
+   * @throws IOException
+   */
+  private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
+    throws IOException {
+    HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+    for (HCatFieldSchema field :
+        inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+      allCols.append(field);
+    }
+    for (HCatFieldSchema field :
+        inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+      allCols.append(field);
+    }
+    return allCols;
+  }
+
+  /**
+   * Get vertex input table schema.
+   *
+   * @param conf Job configuration
+   * @return Vertex input table schema
+   * @throws IOException
+   */
+  public static HCatSchema getVertexTableSchema(Configuration conf)
+    throws IOException {
+    return getTableSchema(getVertexJobInfo(conf));
+  }
+
+  /**
+   * Get edge input table schema.
+   *
+   * @param conf Job configuration
+   * @return Edge input table schema
+   * @throws IOException
+   */
+  public static HCatSchema getEdgeTableSchema(Configuration conf)
+    throws IOException {
+    return getTableSchema(getEdgeJobInfo(conf));
+  }
+
+  /**
+   * Set input path for job.
+   *
+   * @param jobConf Job configuration
+   * @param location Location of input files
+   * @throws IOException
+   */
+  private void setInputPath(JobConf jobConf, String location)
+    throws IOException {
+    int length = location.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+
+    for (int i = 0; i < length; i++) {
+      char ch = location.charAt(i);
+      switch (ch) {
+      case '{':
+        curlyOpen++;
+        if (!globPattern) {
+          globPattern = true;
+        }
+        break;
+      case '}':
+        curlyOpen--;
+        if (curlyOpen == 0 && globPattern) {
+          globPattern = false;
+        }
+        break;
+      case ',':
+        if (!globPattern) {
+          pathStrings.add(location.substring(pathStart, i));
+          pathStart = i + 1;
+        }
+        break;
+      default:
+      }
+    }
+    pathStrings.add(location.substring(pathStart, length));
+
+    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+
+    FileSystem fs = FileSystem.get(jobConf);
+    Path path = paths[0].makeQualified(fs);
+    StringBuilder str = new StringBuilder(StringUtils.escapeString(
+        path.toString()));
+    for (int i = 1; i < paths.length; i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = paths[i].makeQualified(fs);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+
+    jobConf.set("mapred.input.dir", str.toString());
+  }
+
+  /**
+   * Get input splits for job.
+   *
+   * @param jobContext Job context
+   * @param inputJobInfo Input job info
+   * @return MapReduce setting for file input directory
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private List<InputSplit> getSplits(JobContext jobContext,
+                                     InputJobInfo inputJobInfo)
+    throws IOException, InterruptedException {
+    Configuration conf = jobContext.getConfiguration();
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+    if (partitionInfoList == null) {
+      //No partitions match the specified partition filter
+      return splits;
+    }
+
+    HCatStorageHandler storageHandler;
+    JobConf jobConf;
+    //For each matching partition, call getSplits on the underlying InputFormat
+    for (PartInfo partitionInfo : partitionInfoList) {
+      jobConf = HCatUtil.getJobConfFromContext(jobContext);
+      setInputPath(jobConf, partitionInfo.getLocation());
+      Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+      for (HCatFieldSchema field :
+          inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+        allCols.append(field);
+      }
+      for (HCatFieldSchema field :
+          inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+        allCols.append(field);
+      }
+
+      HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+      storageHandler = HCatUtil.getStorageHandler(
+          jobConf, partitionInfo);
+
+      //Get the input format
+      Class inputFormatClass = storageHandler.getInputFormatClass();
+      org.apache.hadoop.mapred.InputFormat inputFormat =
+          getMapRedInputFormat(jobConf, inputFormatClass);
+
+      //Call getSplit on the InputFormat, create an HCatSplit for each
+      //underlying split. When the desired number of input splits is missing,
+      //use a default number (denoted by zero).
+      //TODO: Currently each partition is split independently into
+      //a desired number. However, we want the union of all partitions to be
+      //split into a desired number while maintaining balanced sizes of input
+      //splits.
+      int desiredNumSplits =
+          conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
+      org.apache.hadoop.mapred.InputSplit[] baseSplits =
+          inputFormat.getSplits(jobConf, desiredNumSplits);
+
+      for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
+        splits.add(new HCatSplit(partitionInfo, split, allCols));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Get vertex {@link InputJobInfo}.
+   *
+   * @param conf Configuration
+   * @return Vertex input job info
+   * @throws IOException
+   */
+  private static InputJobInfo getVertexJobInfo(Configuration conf)
+    throws IOException {
+    String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
+    if (jobString == null) {
+      throw new IOException("Vertex job information not found in JobContext." +
+          " GiraphHCatInputFormat.setVertexInput() not called?");
+    }
+    return (InputJobInfo) HCatUtil.deserialize(jobString);
+  }
+
+  /**
+   * Get edge {@link InputJobInfo}.
+   *
+   * @param conf Configuration
+   * @return Edge input job info
+   * @throws IOException
+   */
+  private static InputJobInfo getEdgeJobInfo(Configuration conf)
+    throws IOException {
+    String jobString = conf.get(EDGE_INPUT_JOB_INFO);
+    if (jobString == null) {
+      throw new IOException("Edge job information not found in JobContext." +
+          " GiraphHCatInputFormat.setEdgeInput() not called?");
+    }
+    return (InputJobInfo) HCatUtil.deserialize(jobString);
+  }
+
+  /**
+   * Get vertex input splits.
+   *
+   * @param jobContext Job context
+   * @return List of vertex {@link InputSplit}s
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getVertexSplits(JobContext jobContext)
+    throws IOException, InterruptedException {
+    return getSplits(jobContext,
+        getVertexJobInfo(jobContext.getConfiguration()));
+  }
+
+  /**
+   * Get edge input splits.
+   *
+   * @param jobContext Job context
+   * @return List of edge {@link InputSplit}s
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getEdgeSplits(JobContext jobContext)
+    throws IOException, InterruptedException {
+    return getSplits(jobContext,
+        getEdgeJobInfo(jobContext.getConfiguration()));
+  }
+
+  /**
+   * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
+   *
+   * @param split Input split
+   * @param schema Table schema
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private RecordReader<WritableComparable, HCatRecord>
+  createRecordReader(InputSplit split,
+                     HCatSchema schema,
+                     TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
+    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+    JobContext jobContext = taskContext;
+    Configuration conf = jobContext.getConfiguration();
+
+    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
+        conf, partitionInfo);
+
+    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
+    Map<String, String> jobProperties = partitionInfo.getJobProperties();
+    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+    Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
+        schema, partitionInfo);
+
+    return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
+  }
+
+  /**
+   * Create a {@link RecordReader} for vertices.
+   *
+   * @param split Input split
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public RecordReader<WritableComparable, HCatRecord>
+  createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    return createRecordReader(split, getVertexTableSchema(
+        taskContext.getConfiguration()), taskContext);
+  }
+
+  /**
+   * Create a {@link RecordReader} for edges.
+   *
+   * @param split Input split
+   * @param taskContext Context
+   * @return Record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public RecordReader<WritableComparable, HCatRecord>
+  createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+    return createRecordReader(split, getEdgeTableSchema(
+        taskContext.getConfiguration()), taskContext);
+  }
+
+  /**
+   * Get values for fields requested by output schema which will not be in the
+   * data.
+   *
+   * @param outputSchema Output schema
+   * @param partInfo Partition info
+   * @return Values not in data columns
+   */
+  private static Map<String, String> getColValsNotInDataColumns(
+      HCatSchema outputSchema,
+      PartInfo partInfo) {
+    HCatSchema dataSchema = partInfo.getPartitionSchema();
+    Map<String, String> vals = new HashMap<String, String>();
+    for (String fieldName : outputSchema.getFieldNames()) {
+      if (dataSchema.getPosition(fieldName) == null) {
+        // this entry of output is not present in the output schema
+        // so, we first check the table schema to see if it is a part col
+        if (partInfo.getPartitionValues().containsKey(fieldName)) {
+          vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
+        } else {
+          vals.put(fieldName, null);
+        }
+      }
+    }
+    return vals;
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public abstract class HCatalogEdgeInputFormat<
+    I extends WritableComparable,
+    E extends Writable>
+    extends EdgeInputFormat<I, E> {
+  /**
+   * HCatalog input format.
+   */
+  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+  @Override
+  public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getEdgeSplits(context);
+  }
+
+  /**
+   * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
+   */
+  protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
+    /** Internal {@link RecordReader}. */
+    private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    @Override
+    public final void initialize(InputSplit inputSplit,
+                                 TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      hCatRecordReader =
+          hCatInputFormat.createEdgeRecordReader(inputSplit, context);
+      hCatRecordReader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    @Override
+    public boolean nextEdge() throws IOException, InterruptedException {
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+     * Get the record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Create {@link EdgeReader}.
+
+   * @return {@link HCatalogEdgeReader} instance.
+   */
+  protected abstract HCatalogEdgeReader createEdgeReader();
+
+  @Override
+  public final EdgeReader<I, E>
+  createEdgeReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogEdgeReader reader = createEdgeReader();
+      reader.initialize(split, context);
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createEdgeReader: Interrupted creating reader.", e);
+    }
+  }
+
+  /**
+   * {@link HCatalogEdgeReader} for tables holding a complete edge
+   * in each row.
+   */
+  protected abstract class SingleRowHCatalogEdgeReader
+      extends HCatalogEdgeReader {
+    /**
+     * Get source vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Source vertex id
+     */
+    protected abstract I getSourceVertexId(HCatRecord record);
+
+    /**
+     * Get target vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Target vertex id
+     */
+    protected abstract I getTargetVertexId(HCatRecord record);
+
+    /**
+     * Get edge value from a record.
+     *
+     * @param record Input record
+     * @return E Edge value
+     */
+    protected abstract E getEdgeValue(HCatRecord record);
+
+    @Override
+    public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      return new EdgeWithSource<I, E>(
+          getSourceVertexId(record),
+          new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
+    }
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.VertexValueInputFormat;
+import org.apache.giraph.graph.VertexValueReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link VertexValueInputFormat} for reading vertex values from
+ * Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class HCatalogVertexValueInputFormat<I extends
+    WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable>
+    extends VertexValueInputFormat<I, V, E, M> {
+  /**
+   * HCatalog input format.
+   */
+  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getVertexSplits(context);
+  }
+
+  /**
+   * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
+   */
+  protected abstract class HCatalogVertexValueReader
+      extends VertexValueReader<I, V, E, M> {
+    /** Internal {@link RecordReader}. */
+    private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    @Override
+    public final void initialize(InputSplit inputSplit,
+                                 TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      hCatRecordReader =
+          hCatInputFormat.createVertexRecordReader(inputSplit, context);
+      hCatRecordReader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+     * Get the record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Create {@link VertexValueReader}.
+
+   * @return {@link HCatalogVertexValueReader} instance.
+   */
+  protected abstract HCatalogVertexValueReader createVertexValueReader();
+
+  @Override
+  public final VertexValueReader<I, V, E, M>
+  createVertexValueReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogVertexValueReader reader = createVertexValueReader();
+      reader.initialize(split, context);
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createVertexValueReader: Interrupted creating reader.", e);
+    }
+  }
+
+  /**
+   * {@link HCatalogVertexValueReader} for tables holding a complete vertex
+   * value in each row.
+   */
+  protected abstract class SingleRowHCatalogVertexValueReader
+      extends HCatalogVertexValueReader {
+    /**
+     * Get vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Vertex id
+     */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+     * Get vertex value from a record.
+     *
+     * @param record Input record
+     * @return V Vertex value
+     */
+    protected abstract V getVertexValue(HCatRecord record);
+
+    @Override
+    public final I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getVertexId(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getVertexValue(getRecordReader().getCurrentValue());
+    }
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility methods copied from HCatalog because of visibility restrictions.
+ */
+public class HCatUtils {
+  /**
+   * Don't instantiate.
+   */
+  private HCatUtils() { }
+
+  /**
+   * Returns the given InputJobInfo after populating with data queried from the
+   * metadata service.
+   *
+   * @param conf Configuration
+   * @param inputJobInfo Input job info
+   * @return Populated input job info
+   * @throws IOException
+   */
+  public static InputJobInfo getInputJobInfo(
+      Configuration conf, InputJobInfo inputJobInfo)
+    throws IOException {
+    HiveMetaStoreClient client = null;
+    HiveConf hiveConf;
+    try {
+      if (conf != null) {
+        hiveConf = HCatUtil.getHiveConf(conf);
+      } else {
+        hiveConf = new HiveConf(GiraphHCatInputFormat.class);
+      }
+      client = HCatUtil.getHiveClient(hiveConf);
+      Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+          inputJobInfo.getTableName());
+
+      List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+      if (table.getPartitionKeys().size() != 0) {
+        // Partitioned table
+        List<Partition> parts = client.listPartitionsByFilter(
+            inputJobInfo.getDatabaseName(),
+            inputJobInfo.getTableName(),
+            inputJobInfo.getFilter(),
+            (short) -1);
+
+        if (parts != null) {
+          // Default to 100,000 partitions if hive.metastore.maxpartition is not
+          // defined
+          int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+          if (parts.size() > maxPart) {
+            throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
+                "total number of partitions is " + parts.size());
+          }
+
+          // Populate partition info
+          for (Partition ptn : parts) {
+            HCatSchema schema = HCatUtil.extractSchema(
+                new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+            PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+                ptn.getParameters(), conf, inputJobInfo);
+            partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
+                ptn));
+            partInfoList.add(partInfo);
+          }
+        }
+      } else {
+        // Non partitioned table
+        HCatSchema schema = HCatUtil.extractSchema(table);
+        PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+            table.getParameters(), conf, inputJobInfo);
+        partInfo.setPartitionValues(new HashMap<String, String>());
+        partInfoList.add(partInfo);
+      }
+      inputJobInfo.setPartitions(partInfoList);
+    } catch (MetaException e) {
+      throw new IOException("Got MetaException", e);
+    } catch (NoSuchObjectException e) {
+      throw new IOException("Got NoSuchObjectException", e);
+    } catch (TException e) {
+      throw new IOException("Got TException", e);
+    } catch (HiveException e) {
+      throw new IOException("Got HiveException", e);
+    } finally {
+      HCatUtil.closeHiveClientQuietly(client);
+    }
+    return inputJobInfo;
+  }
+
+  /**
+   * Extract partition info.
+   *
+   * @param schema Table schema
+   * @param sd Storage descriptor
+   * @param parameters Parameters
+   * @param conf Configuration
+   * @param inputJobInfo Input job info
+   * @return Partition info
+   * @throws IOException
+   */
+  private static PartInfo extractPartInfo(
+      HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
+      Configuration conf, InputJobInfo inputJobInfo) throws IOException {
+    StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
+
+    Properties hcatProperties = new Properties();
+    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
+        storerInfo);
+
+    // Copy the properties from storageHandler to jobProperties
+    Map<String, String> jobProperties =
+        HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+
+    for (Map.Entry<String, String> param : parameters.entrySet()) {
+      hcatProperties.put(param.getKey(), param.getValue());
+    }
+
+    return new PartInfo(schema, storageHandler, sd.getLocation(),
+        hcatProperties, jobProperties, inputJobInfo.getTableInfo());
+  }
+
+  /**
+   * Create a new {@link HCatRecordReader}.
+   *
+   * @param storageHandler Storage handler
+   * @param valuesNotInDataCols Values not in data columns
+   * @return Record reader
+   */
+  public static RecordReader newHCatReader(
+      HCatStorageHandler storageHandler,
+      Map<String, String> valuesNotInDataCols) {
+    return new HCatRecordReader(storageHandler, valuesNotInDataCols);
+  }
+
+  /**
+   * Cast an {@link InputSplit} to {@link HCatSplit}.
+   *
+   * @param split Input split
+   * @return {@link HCatSplit}
+   * @throws IOException
+   */
+  public static HCatSplit castToHCatSplit(InputSplit split)
+    throws IOException {
+    return InternalUtil.castToHCatSplit(split);
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for HCatalog helper utilities.
+ */
+package org.apache.hcatalog.mapreduce;