You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/20 06:59:53 UTC
svn commit: r1411554 - in
/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache:
giraph/io/hcatalog/ hcatalog/ hcatalog/mapreduce/
Author: apresta
Date: Tue Nov 20 05:59:52 2012
New Revision: 1411554
URL: http://svn.apache.org/viewvc?rev=1411554&view=rev
Log:
Adding missing files from GIRAPH-405 (apresta)
Added:
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatUtils;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides functionality similar to
+ * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ */
+public class GiraphHCatInputFormat extends HCatBaseInputFormat {
+ /** Vertex input job info for HCatalog. */
+ public static final String VERTEX_INPUT_JOB_INFO =
+ "giraph.hcat.vertex.input.job.info";
+ /** Edge input job info for HCatalog. */
+ public static final String EDGE_INPUT_JOB_INFO =
+ "giraph.hcat.edge.input.job.info";
+
+ /**
+ * Set vertex {@link InputJobInfo}.
+ *
+ * @param job The job
+ * @param inputJobInfo Vertex input job info
+ * @throws IOException
+ */
+ public static void setVertexInput(Job job,
+ InputJobInfo inputJobInfo)
+ throws IOException {
+ InputJobInfo vertexInputJobInfo = InputJobInfo.create(
+ inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter());
+ vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+ Configuration conf = job.getConfiguration();
+ conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
+ HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
+ }
+
+ /**
+ * Set edge {@link InputJobInfo}.
+ *
+ * @param job The job
+ * @param inputJobInfo Edge input job info
+ * @throws IOException
+ */
+ public static void setEdgeInput(Job job,
+ InputJobInfo inputJobInfo)
+ throws IOException {
+ InputJobInfo edgeInputJobInfo = InputJobInfo.create(
+ inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter());
+ edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
+ Configuration conf = job.getConfiguration();
+ conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
+ HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
+ }
+
+ /**
+ * Get table schema from input job info.
+ *
+ * @param inputJobInfo Input job info
+ * @return Input table schema
+ * @throws IOException
+ */
+ private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
+ throws IOException {
+ HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+ for (HCatFieldSchema field :
+ inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+ allCols.append(field);
+ }
+ for (HCatFieldSchema field :
+ inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+ allCols.append(field);
+ }
+ return allCols;
+ }
+
+ /**
+ * Get vertex input table schema.
+ *
+ * @param conf Job configuration
+ * @return Vertex input table schema
+ * @throws IOException
+ */
+ public static HCatSchema getVertexTableSchema(Configuration conf)
+ throws IOException {
+ return getTableSchema(getVertexJobInfo(conf));
+ }
+
+ /**
+ * Get edge input table schema.
+ *
+ * @param conf Job configuration
+ * @return Edge input table schema
+ * @throws IOException
+ */
+ public static HCatSchema getEdgeTableSchema(Configuration conf)
+ throws IOException {
+ return getTableSchema(getEdgeJobInfo(conf));
+ }
+
+ /**
+ * Set input path for job.
+ *
+ * @param jobConf Job configuration
+ * @param location Location of input files
+ * @throws IOException
+ */
+ private void setInputPath(JobConf jobConf, String location)
+ throws IOException {
+ int length = location.length();
+ int curlyOpen = 0;
+ int pathStart = 0;
+ boolean globPattern = false;
+ List<String> pathStrings = new ArrayList<String>();
+
+ for (int i = 0; i < length; i++) {
+ char ch = location.charAt(i);
+ switch (ch) {
+ case '{':
+ curlyOpen++;
+ if (!globPattern) {
+ globPattern = true;
+ }
+ break;
+ case '}':
+ curlyOpen--;
+ if (curlyOpen == 0 && globPattern) {
+ globPattern = false;
+ }
+ break;
+ case ',':
+ if (!globPattern) {
+ pathStrings.add(location.substring(pathStart, i));
+ pathStart = i + 1;
+ }
+ break;
+ default:
+ }
+ }
+ pathStrings.add(location.substring(pathStart, length));
+
+ Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+
+ FileSystem fs = FileSystem.get(jobConf);
+ Path path = paths[0].makeQualified(fs);
+ StringBuilder str = new StringBuilder(StringUtils.escapeString(
+ path.toString()));
+ for (int i = 1; i < paths.length; i++) {
+ str.append(StringUtils.COMMA_STR);
+ path = paths[i].makeQualified(fs);
+ str.append(StringUtils.escapeString(path.toString()));
+ }
+
+ jobConf.set("mapred.input.dir", str.toString());
+ }
+
+ /**
+ * Get input splits for job.
+ *
+ * @param jobContext Job context
+ * @param inputJobInfo Input job info
+ * @return MapReduce setting for file input directory
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private List<InputSplit> getSplits(JobContext jobContext,
+ InputJobInfo inputJobInfo)
+ throws IOException, InterruptedException {
+ Configuration conf = jobContext.getConfiguration();
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+ if (partitionInfoList == null) {
+ //No partitions match the specified partition filter
+ return splits;
+ }
+
+ HCatStorageHandler storageHandler;
+ JobConf jobConf;
+ //For each matching partition, call getSplits on the underlying InputFormat
+ for (PartInfo partitionInfo : partitionInfoList) {
+ jobConf = HCatUtil.getJobConfFromContext(jobContext);
+ setInputPath(jobConf, partitionInfo.getLocation());
+ Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+ HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+ for (HCatFieldSchema field :
+ inputJobInfo.getTableInfo().getDataColumns().getFields()) {
+ allCols.append(field);
+ }
+ for (HCatFieldSchema field :
+ inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
+ allCols.append(field);
+ }
+
+ HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+ storageHandler = HCatUtil.getStorageHandler(
+ jobConf, partitionInfo);
+
+ //Get the input format
+ Class inputFormatClass = storageHandler.getInputFormatClass();
+ org.apache.hadoop.mapred.InputFormat inputFormat =
+ getMapRedInputFormat(jobConf, inputFormatClass);
+
+ //Call getSplit on the InputFormat, create an HCatSplit for each
+ //underlying split. When the desired number of input splits is missing,
+ //use a default number (denoted by zero).
+ //TODO: Currently each partition is split independently into
+ //a desired number. However, we want the union of all partitions to be
+ //split into a desired number while maintaining balanced sizes of input
+ //splits.
+ int desiredNumSplits =
+ conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
+ org.apache.hadoop.mapred.InputSplit[] baseSplits =
+ inputFormat.getSplits(jobConf, desiredNumSplits);
+
+ for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
+ splits.add(new HCatSplit(partitionInfo, split, allCols));
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * Get vertex {@link InputJobInfo}.
+ *
+ * @param conf Configuration
+ * @return Vertex input job info
+ * @throws IOException
+ */
+ private static InputJobInfo getVertexJobInfo(Configuration conf)
+ throws IOException {
+ String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
+ if (jobString == null) {
+ throw new IOException("Vertex job information not found in JobContext." +
+ " GiraphHCatInputFormat.setVertexInput() not called?");
+ }
+ return (InputJobInfo) HCatUtil.deserialize(jobString);
+ }
+
+ /**
+ * Get edge {@link InputJobInfo}.
+ *
+ * @param conf Configuration
+ * @return Edge input job info
+ * @throws IOException
+ */
+ private static InputJobInfo getEdgeJobInfo(Configuration conf)
+ throws IOException {
+ String jobString = conf.get(EDGE_INPUT_JOB_INFO);
+ if (jobString == null) {
+ throw new IOException("Edge job information not found in JobContext." +
+ " GiraphHCatInputFormat.setEdgeInput() not called?");
+ }
+ return (InputJobInfo) HCatUtil.deserialize(jobString);
+ }
+
+ /**
+ * Get vertex input splits.
+ *
+ * @param jobContext Job context
+ * @return List of vertex {@link InputSplit}s
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public List<InputSplit> getVertexSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ return getSplits(jobContext,
+ getVertexJobInfo(jobContext.getConfiguration()));
+ }
+
+ /**
+ * Get edge input splits.
+ *
+ * @param jobContext Job context
+ * @return List of edge {@link InputSplit}s
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public List<InputSplit> getEdgeSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ return getSplits(jobContext,
+ getEdgeJobInfo(jobContext.getConfiguration()));
+ }
+
+ /**
+ * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
+ *
+ * @param split Input split
+ * @param schema Table schema
+ * @param taskContext Context
+ * @return Record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private RecordReader<WritableComparable, HCatRecord>
+ createRecordReader(InputSplit split,
+ HCatSchema schema,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
+ PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+ JobContext jobContext = taskContext;
+ Configuration conf = jobContext.getConfiguration();
+
+ HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
+ conf, partitionInfo);
+
+ JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
+ Map<String, String> jobProperties = partitionInfo.getJobProperties();
+ HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+ Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
+ schema, partitionInfo);
+
+ return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
+ }
+
+ /**
+ * Create a {@link RecordReader} for vertices.
+ *
+ * @param split Input split
+ * @param taskContext Context
+ * @return Record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public RecordReader<WritableComparable, HCatRecord>
+ createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ return createRecordReader(split, getVertexTableSchema(
+ taskContext.getConfiguration()), taskContext);
+ }
+
+ /**
+ * Create a {@link RecordReader} for edges.
+ *
+ * @param split Input split
+ * @param taskContext Context
+ * @return Record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public RecordReader<WritableComparable, HCatRecord>
+ createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ return createRecordReader(split, getEdgeTableSchema(
+ taskContext.getConfiguration()), taskContext);
+ }
+
+ /**
+ * Get values for fields requested by output schema which will not be in the
+ * data.
+ *
+ * @param outputSchema Output schema
+ * @param partInfo Partition info
+ * @return Values not in data columns
+ */
+ private static Map<String, String> getColValsNotInDataColumns(
+ HCatSchema outputSchema,
+ PartInfo partInfo) {
+ HCatSchema dataSchema = partInfo.getPartitionSchema();
+ Map<String, String> vals = new HashMap<String, String>();
+ for (String fieldName : outputSchema.getFieldNames()) {
+ if (dataSchema.getPosition(fieldName) == null) {
+ // this entry of output is not present in the output schema
+ // so, we first check the table schema to see if it is a part col
+ if (partInfo.getPartitionValues().containsKey(fieldName)) {
+ vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
+ } else {
+ vals.put(fieldName, null);
+ }
+ }
+ }
+ return vals;
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public abstract class HCatalogEdgeInputFormat<
+ I extends WritableComparable,
+ E extends Writable>
+ extends EdgeInputFormat<I, E> {
+ /**
+ * HCatalog input format.
+ */
+ private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+ @Override
+ public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return hCatInputFormat.getEdgeSplits(context);
+ }
+
+ /**
+ * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
+ */
+ protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
+ /** Internal {@link RecordReader}. */
+ private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+ /** Context passed to initialize. */
+ private TaskAttemptContext context;
+
+ @Override
+ public final void initialize(InputSplit inputSplit,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ hCatRecordReader =
+ hCatInputFormat.createEdgeRecordReader(inputSplit, context);
+ hCatRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ return hCatRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ hCatRecordReader.close();
+ }
+
+ @Override
+ public final float getProgress() throws IOException, InterruptedException {
+ return hCatRecordReader.getProgress();
+ }
+
+ /**
+ * Get the record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected final RecordReader<WritableComparable, HCatRecord>
+ getRecordReader() {
+ return hCatRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected final TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ /**
+ * Create {@link EdgeReader}.
+
+ * @return {@link HCatalogEdgeReader} instance.
+ */
+ protected abstract HCatalogEdgeReader createEdgeReader();
+
+ @Override
+ public final EdgeReader<I, E>
+ createEdgeReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ try {
+ HCatalogEdgeReader reader = createEdgeReader();
+ reader.initialize(split, context);
+ return reader;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createEdgeReader: Interrupted creating reader.", e);
+ }
+ }
+
+ /**
+ * {@link HCatalogEdgeReader} for tables holding a complete edge
+ * in each row.
+ */
+ protected abstract class SingleRowHCatalogEdgeReader
+ extends HCatalogEdgeReader {
+ /**
+ * Get source vertex id from a record.
+ *
+ * @param record Input record
+ * @return I Source vertex id
+ */
+ protected abstract I getSourceVertexId(HCatRecord record);
+
+ /**
+ * Get target vertex id from a record.
+ *
+ * @param record Input record
+ * @return I Target vertex id
+ */
+ protected abstract I getTargetVertexId(HCatRecord record);
+
+ /**
+ * Get edge value from a record.
+ *
+ * @param record Input record
+ * @return E Edge value
+ */
+ protected abstract E getEdgeValue(HCatRecord record);
+
+ @Override
+ public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ InterruptedException {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ return new EdgeWithSource<I, E>(
+ getSourceVertexId(record),
+ new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
+ }
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.graph.VertexValueInputFormat;
+import org.apache.giraph.graph.VertexValueReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HCatalog {@link VertexValueInputFormat} for reading vertex values from
+ * Hive/Pig.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class HCatalogVertexValueInputFormat<I extends
+ WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexValueInputFormat<I, V, E, M> {
+ /**
+ * HCatalog input format.
+ */
+ private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return hCatInputFormat.getVertexSplits(context);
+ }
+
+ /**
+ * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
+ */
+ protected abstract class HCatalogVertexValueReader
+ extends VertexValueReader<I, V, E, M> {
+ /** Internal {@link RecordReader}. */
+ private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+ /** Context passed to initialize. */
+ private TaskAttemptContext context;
+
+ @Override
+ public final void initialize(InputSplit inputSplit,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ super.initialize(inputSplit, context);
+ hCatRecordReader =
+ hCatInputFormat.createVertexRecordReader(inputSplit, context);
+ hCatRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return hCatRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ hCatRecordReader.close();
+ }
+
+ @Override
+ public final float getProgress() throws IOException, InterruptedException {
+ return hCatRecordReader.getProgress();
+ }
+
+ /**
+ * Get the record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected final RecordReader<WritableComparable, HCatRecord>
+ getRecordReader() {
+ return hCatRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected final TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ /**
+ * Create {@link VertexValueReader}.
+
+ * @return {@link HCatalogVertexValueReader} instance.
+ */
+ protected abstract HCatalogVertexValueReader createVertexValueReader();
+
+ @Override
+ public final VertexValueReader<I, V, E, M>
+ createVertexValueReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ try {
+ HCatalogVertexValueReader reader = createVertexValueReader();
+ reader.initialize(split, context);
+ return reader;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createVertexValueReader: Interrupted creating reader.", e);
+ }
+ }
+
+ /**
+ * {@link HCatalogVertexValueReader} for tables holding a complete vertex
+ * value in each row.
+ */
+ protected abstract class SingleRowHCatalogVertexValueReader
+ extends HCatalogVertexValueReader {
+ /**
+ * Get vertex id from a record.
+ *
+ * @param record Input record
+ * @return I Vertex id
+ */
+ protected abstract I getVertexId(HCatRecord record);
+
+ /**
+ * Get vertex value from a record.
+ *
+ * @param record Input record
+ * @return V Vertex value
+ */
+ protected abstract V getVertexValue(HCatRecord record);
+
+ @Override
+ public final I getCurrentVertexId() throws IOException,
+ InterruptedException {
+ return getVertexId(getRecordReader().getCurrentValue());
+ }
+
+ @Override
+ public final V getCurrentVertexValue() throws IOException,
+ InterruptedException {
+ return getVertexValue(getRecordReader().getCurrentValue());
+ }
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility methods copied from HCatalog because of visibility restrictions.
+ */
+public class HCatUtils {
+ /**
+ * Don't instantiate.
+ */
+ private HCatUtils() { }
+
+ /**
+ * Returns the given InputJobInfo after populating with data queried from the
+ * metadata service.
+ *
+ * @param conf Configuration
+ * @param inputJobInfo Input job info
+ * @return Populated input job info
+ * @throws IOException
+ */
+ public static InputJobInfo getInputJobInfo(
+ Configuration conf, InputJobInfo inputJobInfo)
+ throws IOException {
+ HiveMetaStoreClient client = null;
+ HiveConf hiveConf;
+ try {
+ if (conf != null) {
+ hiveConf = HCatUtil.getHiveConf(conf);
+ } else {
+ hiveConf = new HiveConf(GiraphHCatInputFormat.class);
+ }
+ client = HCatUtil.getHiveClient(hiveConf);
+ Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName());
+
+ List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+ inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+ if (table.getPartitionKeys().size() != 0) {
+ // Partitioned table
+ List<Partition> parts = client.listPartitionsByFilter(
+ inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter(),
+ (short) -1);
+
+ if (parts != null) {
+ // Default to 100,000 partitions if hive.metastore.maxpartition is not
+ // defined
+ int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+ if (parts.size() > maxPart) {
+ throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
+ "total number of partitions is " + parts.size());
+ }
+
+ // Populate partition info
+ for (Partition ptn : parts) {
+ HCatSchema schema = HCatUtil.extractSchema(
+ new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+ PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+ ptn.getParameters(), conf, inputJobInfo);
+ partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
+ ptn));
+ partInfoList.add(partInfo);
+ }
+ }
+ } else {
+ // Non partitioned table
+ HCatSchema schema = HCatUtil.extractSchema(table);
+ PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+ table.getParameters(), conf, inputJobInfo);
+ partInfo.setPartitionValues(new HashMap<String, String>());
+ partInfoList.add(partInfo);
+ }
+ inputJobInfo.setPartitions(partInfoList);
+ } catch (MetaException e) {
+ throw new IOException("Got MetaException", e);
+ } catch (NoSuchObjectException e) {
+ throw new IOException("Got NoSuchObjectException", e);
+ } catch (TException e) {
+ throw new IOException("Got TException", e);
+ } catch (HiveException e) {
+ throw new IOException("Got HiveException", e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ return inputJobInfo;
+ }
+
+ /**
+ * Extract partition info.
+ *
+ * @param schema Table schema
+ * @param sd Storage descriptor
+ * @param parameters Parameters
+ * @param conf Configuration
+ * @param inputJobInfo Input job info
+ * @return Partition info
+ * @throws IOException
+ */
+ private static PartInfo extractPartInfo(
+ HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
+ Configuration conf, InputJobInfo inputJobInfo) throws IOException {
+ StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
+
+ Properties hcatProperties = new Properties();
+ HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
+ storerInfo);
+
+ // Copy the properties from storageHandler to jobProperties
+ Map<String, String> jobProperties =
+ HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+
+ for (Map.Entry<String, String> param : parameters.entrySet()) {
+ hcatProperties.put(param.getKey(), param.getValue());
+ }
+
+ return new PartInfo(schema, storageHandler, sd.getLocation(),
+ hcatProperties, jobProperties, inputJobInfo.getTableInfo());
+ }
+
+ /**
+ * Create a new {@link HCatRecordReader}.
+ *
+ * @param storageHandler Storage handler
+ * @param valuesNotInDataCols Values not in data columns
+ * @return Record reader
+ */
+ public static RecordReader newHCatReader(
+ HCatStorageHandler storageHandler,
+ Map<String, String> valuesNotInDataCols) {
+ return new HCatRecordReader(storageHandler, valuesNotInDataCols);
+ }
+
+ /**
+ * Cast an {@link InputSplit} to {@link HCatSplit}.
+ *
+ * @param split Input split
+ * @return {@link HCatSplit}
+ * @throws IOException
+ */
+ public static HCatSplit castToHCatSplit(InputSplit split)
+ throws IOException {
+ return InternalUtil.castToHCatSplit(split);
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java?rev=1411554&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/hcatalog/mapreduce/package-info.java Tue Nov 20 05:59:52 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for HCatalog helper utilities.
+ */
+package org.apache.hcatalog.mapreduce;