You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 09:53:00 UTC

svn commit: r1388358 - in /giraph/trunk: ./ giraph-formats-contrib/ giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/

Author: aching
Date: Fri Sep 21 07:53:00 2012
New Revision: 1388358

URL: http://svn.apache.org/viewvc?rev=1388358&view=rev
Log:
GIRAPH-93: Hive input / output format. (nitayj via aching)


Added:
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/pom.xml

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1388358&r1=1388357&r2=1388358&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Sep 21 07:53:00 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-93: Hive input / output format. (nitayj via aching)
+
   GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
   (nitayj via aching)
 

Modified: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1388358&r1=1388357&r2=1388358&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (original)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Fri Sep 21 07:53:00 2012
@@ -28,6 +28,8 @@ under the License.
         <compileSource>1.6</compileSource>
         <hadoop.version>0.20.203.0</hadoop.version>
         <hbase.version>0.90.5</hbase.version>
+        <hive.version>0.10.0-SNAPSHOT</hive.version>
+        <hcatalog.version>0.4.0-dev</hcatalog.version>
         <accumulo.version>1.4.0</accumulo.version>
         <maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
         <maven-javadoc-plugin.version>2.6</maven-javadoc-plugin.version>
@@ -144,5 +146,15 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hcatalog</groupId>
+          <artifactId>hcatalog</artifactId>
+          <version>${hcatalog.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-common</artifactId>
+          <version>${hive.version}</version>
+        </dependency>
     </dependencies>
 </project>

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Abstract class that users should subclass to load data from a Hive or Pig
+ * table. You can easily implement a {@link HCatalogVertexReader} by extending
+ * either {@link SingleRowHCatalogVertexReader} or
+ * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
+ * stored in the input table.
+ * <p>
+ * The desired database and table name to load from can be specified via
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * as you setup your vertex input format with
+ * {@link GiraphJob#setVertexInputFormatClass(Class)}.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+		extends VertexInputFormat<I, V, E, M> {
+
+	protected HCatInputFormat hCatInputFormat = new HCatInputFormat();
+
+	@Override
+	public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+			throws IOException, InterruptedException {
+		return hCatInputFormat.getSplits(context);
+	}
+
+	/**
+	 * Abstract class that users should subclass based on their specific vertex
+	 * input. HCatRecord can be parsed to get the required data for implementing
+	 * getCurrentVertex(). If the vertex spans more than one HCatRecord,
+	 * nextVertex() should be overwritten to handle that logic as well.
+	 * 
+	 * @param <I>
+	 *            Vertex index value
+	 * @param <V>
+	 *            Vertex value
+	 * @param <E>
+	 *            Edge value
+	 * @param <M>
+	 *            Message value
+	 */
+	protected abstract class HCatalogVertexReader implements
+			VertexReader<I, V, E, M> {
+
+		/** Internal HCatRecordReader */
+		private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+
+		/** Context passed to initialize */
+		private TaskAttemptContext context;
+
+		/**
+		 * Initialize with the HCatRecordReader.
+		 * 
+		 * @param hCatRecordReader
+		 *            Internal reader
+		 */
+		private void initialize(
+				RecordReader<WritableComparable, HCatRecord> hCatRecordReader) {
+			this.hCatRecordReader = hCatRecordReader;
+		}
+
+		@Override
+		public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+				throws IOException, InterruptedException {
+			hCatRecordReader.initialize(inputSplit, context);
+			this.context = context;
+		}
+
+		@Override
+		public boolean nextVertex() throws IOException, InterruptedException {
+			// Users can override this if desired, and a vertex is bigger than
+			// a single row.
+			return hCatRecordReader.nextKeyValue();
+		}
+
+		@Override
+		public void close() throws IOException {
+			hCatRecordReader.close();
+		}
+
+		@Override
+		public float getProgress() throws IOException, InterruptedException {
+			return hCatRecordReader.getProgress();
+		}
+
+		/**
+		 * Get the record reader.
+		 * 
+		 * @return Record reader to be used for reading.
+		 */
+		protected RecordReader<WritableComparable, HCatRecord> getRecordReader() {
+			return hCatRecordReader;
+		}
+
+		/**
+		 * Get the context.
+		 * 
+		 * @return Context passed to initialize.
+		 */
+		protected TaskAttemptContext getContext() {
+			return context;
+		}
+	}
+
+	protected abstract HCatalogVertexReader createVertexReader();
+
+	@Override
+	public final VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+			TaskAttemptContext context) throws IOException {
+		try {
+			HCatalogVertexReader reader = createVertexReader();
+			reader.initialize(hCatInputFormat
+					.createRecordReader(split, context));
+			return reader;
+		} catch (InterruptedException e) {
+			throw new IllegalStateException(
+					"createVertexReader: Interrupted creating reader.", e);
+		}
+	}
+
+	/**
+	 * HCatalogVertexReader for tables holding complete vertex info within each
+	 * row.
+	 */
+	protected abstract class SingleRowHCatalogVertexReader extends
+			HCatalogVertexReader {
+
+		protected abstract I getVertexId(HCatRecord record);
+
+		protected abstract V getVertexValue(HCatRecord record);
+
+		protected abstract Map<I, E> getEdges(HCatRecord record);
+
+		private int recordCount = 0;
+
+		@Override
+		public final Vertex<I, V, E, M> getCurrentVertex()
+				throws IOException, InterruptedException {
+			HCatRecord record = getRecordReader().getCurrentValue();
+			Vertex<I, V, E, M> vertex = BspUtils.createVertex(getContext()
+					.getConfiguration());
+			vertex.initialize(getVertexId(record), getVertexValue(record),
+					getEdges(record), null);
+			++recordCount;
+			if ((recordCount % 1000) == 0) {
+				System.out.println("read " + recordCount + " records");
+				// memory usage
+				Runtime runtime = Runtime.getRuntime();
+				double gb = 1024 * 1024 * 1024;
+				System.out.println("Memory: " + (runtime.totalMemory() / gb)
+						+ "GB total = "
+						+ ((runtime.totalMemory() - runtime.freeMemory()) / gb)
+						+ "GB used + " + (runtime.freeMemory() / gb)
+						+ "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+			}
+			return vertex;
+		}
+
+	}
+
+	/**
+	 * HCatalogVertexReader for tables holding vertex info across multiple rows
+	 * sorted by vertex id column, so that they appear consecutively to the
+	 * RecordReader.
+	 */
+	protected abstract class MultiRowHCatalogVertexReader extends
+			HCatalogVertexReader {
+
+		protected abstract I getVertexId(HCatRecord record);
+
+		protected abstract V getVertexValue(Iterable<HCatRecord> records);
+
+		protected abstract I getTargetVertexId(HCatRecord record);
+
+		protected abstract E getEdgeValue(HCatRecord record);
+
+		private Vertex<I, V, E, M> vertex = null;
+
+		@Override
+		public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+				InterruptedException {
+			return vertex;
+		}
+
+		private I currentVertexId = null;
+		private Map<I, E> destEdgeMap = Maps.newHashMap();
+		private List<HCatRecord> recordsForVertex = Lists.newArrayList();
+		private int recordCount = 0;
+
+		@Override
+		public final boolean nextVertex() throws IOException,
+				InterruptedException {
+			while (getRecordReader().nextKeyValue()) {
+				HCatRecord record = getRecordReader().getCurrentValue();
+				if (currentVertexId == null) {
+					currentVertexId = getVertexId(record);
+				}
+				if (currentVertexId.equals(getVertexId(record))) {
+					destEdgeMap.put(getTargetVertexId(record),
+							getEdgeValue(record));
+					recordsForVertex.add(record);
+				} else {
+					createCurrentVertex();
+					if ((recordCount % 1000) == 0) {
+						System.out.println("read " + recordCount);
+					}
+					currentVertexId = getVertexId(record);
+					recordsForVertex.add(record);
+					return true;
+				}
+			}
+
+			if (destEdgeMap.isEmpty()) {
+				return false;
+			} else {
+				createCurrentVertex();
+				return true;
+			}
+
+		}
+
+		private void createCurrentVertex() {
+			vertex = BspUtils.createVertex(getContext().getConfiguration());
+			vertex.initialize(currentVertexId,
+					getVertexValue(recordsForVertex), destEdgeMap, null);
+			destEdgeMap.clear();
+			recordsForVertex.clear();
+			++recordCount;
+		}
+
+	}
+
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+/**
+ * Abstract class that users should subclass to store data to Hive or Pig table.
+ * You can easily implement a {@link HCatalogVertexWriter} by extending
+ * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
+ * depending on how you want to fit your vertices into the output table.
+ * <p>
+ * The desired database and table name to store to can be specified via
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * as you setup your vertex output format with
+ * {@link GiraphJob#setVertexOutputFormatClass(Class)}. You must create the
+ * output table.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+		extends VertexOutputFormat<I, V, E> {
+
+	protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+	@Override
+	public final void checkOutputSpecs(JobContext context) throws IOException,
+			InterruptedException {
+		hCatOutputFormat.checkOutputSpecs(context);
+	}
+
+	@Override
+	public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
+			throws IOException, InterruptedException {
+		return hCatOutputFormat.getOutputCommitter(context);
+	}
+
+	/**
+	 * Abstract class that users should subclass based on their specific vertex
+	 * output. Users should implement writeVertex to create a HCatRecord that is
+	 * valid to for writing by HCatalogRecordWriter.
+	 * 
+	 * @param <I>
+	 *            Vertex index value
+	 * @param <V>
+	 *            Vertex value
+	 * @param <E>
+	 *            Edge value
+	 */
+	protected abstract class HCatalogVertexWriter implements
+			VertexWriter<I, V, E> {
+
+		/** Internal HCatRecordWriter */
+		private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
+		/** Context passed to initialize */
+		private TaskAttemptContext context;
+
+		/**
+		 * Initialize with the HCatRecordWriter
+		 * 
+		 * @param hCatRecordWriter
+		 *            Internal writer
+		 */
+		private void initialize(
+				RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter) {
+			this.hCatRecordWriter = hCatRecordWriter;
+		}
+
+		/**
+		 * Get the record reader.
+		 * 
+		 * @return Record reader to be used for reading.
+		 */
+		protected RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter() {
+			return hCatRecordWriter;
+		}
+
+		/**
+		 * Get the context.
+		 * 
+		 * @return Context passed to initialize.
+		 */
+		protected TaskAttemptContext getContext() {
+			return context;
+		}
+
+		@Override
+		public void initialize(TaskAttemptContext context) throws IOException {
+			this.context = context;
+		}
+
+		@Override
+		public void close(TaskAttemptContext context) throws IOException,
+				InterruptedException {
+			hCatRecordWriter.close(context);
+		}
+
+	}
+
+	protected abstract HCatalogVertexWriter createVertexWriter();
+
+	@Override
+	public final VertexWriter<I, V, E> createVertexWriter(
+			TaskAttemptContext context) throws IOException,
+			InterruptedException {
+		HCatalogVertexWriter writer = createVertexWriter();
+		writer.initialize(hCatOutputFormat.getRecordWriter(context));
+		return writer;
+	}
+
+	/**
+	 * HCatalogVertexWriter to write each vertex in each row.
+	 */
+	protected abstract class SingleRowHCatalogVertexWriter extends
+			HCatalogVertexWriter {
+
+		protected abstract int getNumColumns();
+
+		protected abstract void fillRecord(HCatRecord record,
+				Vertex<I, V, E, ?> vertex);
+
+		protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+			HCatRecord record = new DefaultHCatRecord(getNumColumns());
+			fillRecord(record, vertex);
+			return record;
+		}
+
+		@Override
+		// XXX It is important not to put generic type signature <I,V,E,?> after
+		// Vertex. Otherwise, any class that extends this will not compile
+		// because of not implementing the VertexWriter#writeVertex. Mystery of
+		// Java Generics :(
+		@SuppressWarnings("unchecked")
+		public final void writeVertex(Vertex vertex) throws IOException,
+				InterruptedException {
+			getRecordWriter().write(null, createRecord(vertex));
+		}
+
+	}
+
+	/**
+	 * HCatalogVertexWriter to write each vertex in multiple rows.
+	 */
+	public abstract class MultiRowHCatalogVertexWriter extends
+			HCatalogVertexWriter {
+
+		protected abstract Iterable<HCatRecord> createRecords(
+				Vertex<I, V, E, ?> vertex);
+
+		@Override
+		// XXX Same thing here. No Generics for Vertex here.
+		@SuppressWarnings("unchecked")
+		public final void writeVertex(Vertex vertex) throws IOException,
+				InterruptedException {
+			Iterable<HCatRecord> records = createRecords(vertex);
+			for (HCatRecord record : records) {
+				getRecordWriter().write(null, record);
+			}
+		}
+
+	}
+
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveGiraphRunner implements Tool {
+
+	@SuppressWarnings("rawtypes")
+	private Class<? extends Vertex> vertexClass;
+	@SuppressWarnings("rawtypes")
+	private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
+	@SuppressWarnings("rawtypes")
+	private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
+
+	protected HiveGiraphRunner(
+			@SuppressWarnings("rawtypes") Class<? extends Vertex> vertexClass,
+			@SuppressWarnings("rawtypes") Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
+			@SuppressWarnings("rawtypes") Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
+		this.vertexClass = vertexClass;
+		this.vertexInputFormatClass = vertexInputFormatClass;
+		this.vertexOutputFormatClass = vertexOutputFormatClass;
+		this.conf = new HiveConf(getClass());
+	}
+
+	protected String dbName;
+	protected String inputTableName;
+	protected String inputTableFilterExpr;
+	protected String outputTableName;
+	protected Map<String, String> outputTablePartitionValues;
+
+	protected int workers;
+	protected boolean isVerbose;
+
+	public static void main(String[] args) throws Exception {
+		System.exit(ToolRunner
+				.run(new HiveGiraphRunner(null, null, null), args));
+	}
+
+	@Override
+	public final int run(String[] args) throws Exception {
+		// process args
+		try {
+			processArguments(args);
+		} catch (InterruptedException e) {
+			return 0;
+		} catch (IllegalArgumentException e) {
+			System.err.println(e.getMessage());
+			return -1;
+		}
+
+		// additional configuration for Hive
+		adjustConfigurationForHive(getConf());
+
+		// setup GiraphJob
+		GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+		job.setVertexClass(vertexClass);
+
+		// setup input from Hive
+		InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
+				inputTableFilterExpr);
+		HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
+		job.setVertexInputFormatClass(vertexInputFormatClass);
+
+		// setup output to Hive
+		HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+				dbName, outputTableName, outputTablePartitionValues));
+		HCatOutputFormat.setSchema(job.getInternalJob(),
+				HCatOutputFormat.getTableSchema(job.getInternalJob()));
+		job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+		job.setWorkerConfiguration(workers, workers, 100.0f);
+		initGiraphJob(job);
+
+		return job.run(isVerbose) ? 0 : -1;
+	}
+
+	private static void adjustConfigurationForHive(Configuration conf) {
+		// when output partitions are used, workers register them to the
+		// metastore at cleanup stage, and on HiveConf's initialization, it
+		// looks for hive-site.xml from.
+		addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+				.getResource("hive-site.xml").toString());
+
+		// Also, you need hive.aux.jars as well
+		// addToStringCollection(conf, "tmpjars",
+		// conf.getStringCollection("hive.aux.jars.path"));
+
+		// Or, more effectively, we can provide all the jars client needed to
+		// the workers as well
+		String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+				File.pathSeparator);
+		List<String> hadoopJarURLs = Lists.newArrayList();
+		for (String jarPath : hadoopJars) {
+			File file = new File(jarPath);
+			if (file.exists() && file.isFile()) {
+				String jarURL = file.toURI().toString();
+				hadoopJarURLs.add(jarURL);
+			}
+		}
+		addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+	}
+
+	private CommandLine processArguments(String[] args) throws ParseException,
+			InterruptedException {
+		Options options = new Options();
+		options.addOption("h", "help", false, "Help");
+		options.addOption("v", "verbose", false, "Verbose");
+		options.addOption("D", "hiveconf", true,
+				"property=value for Hive/Hadoop configuration");
+		options.addOption("w", "workers", true, "Number of workers");
+		if (vertexClass == null)
+			options.addOption(null, "vertexClass", true,
+					"Giraph Vertex class to use");
+		if (vertexInputFormatClass == null)
+			options.addOption(null, "vertexInputFormatClass", true,
+					"Giraph HCatalogVertexInputFormat class to use");
+		if (vertexOutputFormatClass == null)
+			options.addOption(null, "vertexOutputFormatClass", true,
+					"Giraph HCatalogVertexOutputFormat class to use");
+		options.addOption("db", "database", true, "Hive database name");
+		options.addOption("i", "inputTable", true, "Input table name");
+		options.addOption("I", "inputPartition", true,
+				"Input table partition expression (e.g., \"a<2 AND b='two'\"");
+		options.addOption("o", "outputTable", true, "Output table name");
+		options.addOption("O", "outputPartition", true,
+				"Output table partition values (e.g., \"a=1,b=two\")");
+		addMoreOptions(options);
+
+		CommandLineParser parser = new GnuParser();
+		final CommandLine cmdln = parser.parse(options, args);
+		// for (Option opt : cmd.getOptions()) {
+		// System.out.println(" opt -" + opt.getOpt() + " " + opt.getValue());
+		// }
+		if (args.length == 0 || cmdln.hasOption("help")) {
+			new HelpFormatter().printHelp(getClass().getName(), options, true);
+			throw new InterruptedException();
+		}
+
+		// Giraph classes
+		if (cmdln.hasOption("vertexClass"))
+			vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+					Vertex.class);
+		if (cmdln.hasOption("vertexInputFormatClass"))
+			vertexInputFormatClass = findClass(
+					cmdln.getOptionValue("vertexInputFormatClass"),
+					HCatalogVertexInputFormat.class);
+		if (cmdln.hasOption("vertexOutputFormatClass"))
+			vertexOutputFormatClass = findClass(
+					cmdln.getOptionValue("vertexOutputFormatClass"),
+					HCatalogVertexOutputFormat.class);
+
+		if (vertexClass == null)
+			throw new IllegalArgumentException(
+					"Need the Giraph Vertex class name (-vertexClass) to use");
+		if (vertexInputFormatClass == null)
+			throw new IllegalArgumentException(
+					"Need the Giraph VertexInputFormat class name (-vertexInputFormatClass) to use");
+		if (vertexOutputFormatClass == null)
+			throw new IllegalArgumentException(
+					"Need the Giraph VertexOutputFormat class name (-vertexOutputFormatClass) to use");
+
+		if (!cmdln.hasOption("workers"))
+			throw new IllegalArgumentException(
+					"Need to choose the number of workers (-w)");
+		if (!cmdln.hasOption("inputTable"))
+			throw new IllegalArgumentException(
+					"Need to set the input table name (-i).  One example is 'dim_friendlist'");
+		if (!cmdln.hasOption("outputTable"))
+			throw new IllegalArgumentException(
+					"Need to set the output table name (-o).");
+
+		dbName = cmdln.getOptionValue("dbName", "default");
+		inputTableName = cmdln.getOptionValue("inputTable");
+		inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
+		outputTableName = cmdln.getOptionValue("outputTable");
+		outputTablePartitionValues = parsePartitionValues(cmdln
+				.getOptionValue("outputPartition"));
+		workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+		isVerbose = cmdln.hasOption("verbose");
+
+		// pick up -hiveconf arguments
+		Configuration conf = getConf();
+		for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+			String[] keyval = hiveconf.split("=", 2);
+			if (keyval.length == 2) {
+				String name = keyval[0];
+				String value = keyval[1];
+				if (name.equals("tmpjars") || name.equals("tmpfiles"))
+					addToStringCollection(conf, name, value);
+				else
+					conf.set(name, value);
+			}
+		}
+
+		processMoreArguments(cmdln);
+
+		return cmdln;
+	}
+
+	private static void addToStringCollection(Configuration conf, String name,
+			String... values) {
+		addToStringCollection(conf, name, Arrays.asList(values));
+	}
+
+	private static void addToStringCollection(Configuration conf, String name,
+			Collection<? extends String> values) {
+		Collection<String> tmpfiles = conf.getStringCollection(name);
+		tmpfiles.addAll(values);
+		conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+		// System.out.println(name + "=" + conf.get(name));
+	}
+
+	private <T> Class<? extends T> findClass(String className, Class<T> base) {
+		try {
+			Class<?> cls = Class.forName(className);
+			if (base.isAssignableFrom(cls))
+				return cls.asSubclass(base);
+			return null;
+		} catch (ClassNotFoundException e) {
+			throw new IllegalArgumentException(className
+					+ ": Invalid class name");
+		}
+	}
+
+	// TODO use Hive util class if this is already provided by it
+	public static Map<String, String> parsePartitionValues(
+			String outputTablePartitionString) {
+		if (outputTablePartitionString != null) {
+			Map<String, String> partitionValues = Maps.newHashMap();
+			for (String partkeyval : outputTablePartitionString.split(" *, *")) {
+				String[] keyval = partkeyval.split(" *= *", 2);
+				if (keyval.length < 2)
+					throw new IllegalArgumentException(
+							"Unrecognized partition value format: "
+									+ outputTablePartitionString);
+				partitionValues.put(keyval[0], keyval[1]);
+			}
+			return partitionValues;
+		} else
+			return null;
+	}
+
+	private static String serializePartitionValues(
+			Map<String, String> outputTablePartitionValues) {
+		StringBuilder outputTablePartitionValuesString = new StringBuilder();
+		for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
+				.entrySet()) {
+			if (outputTablePartitionValuesString.length() != 0)
+				outputTablePartitionValuesString.append(",");
+			outputTablePartitionValuesString
+					.append(partitionValueEntry.getKey()).append("=")
+					.append(partitionValueEntry.getValue());
+		}
+		return outputTablePartitionValuesString.toString();
+	}
+
+	/** Configuration */
+	private Configuration conf;
+
+	@Override
+	public final Configuration getConf() {
+		return conf;
+	}
+
+	@Override
+	public final void setConf(Configuration conf) {
+		this.conf = conf;
+	}
+
+	/**
+	 * Override this method to add more command-line options. You can process
+	 * them by also overriding {@link #processMoreArguments(CommandLine)}.
+	 * 
+	 * @param options
+	 */
+	protected void addMoreOptions(Options options) {
+	}
+
+	/**
+	 * Override this method to process additional command-line arguments. You
+	 * may want to declare additional options by also overriding
+	 * {@link #addMoreOptions(Options)}.
+	 * 
+	 * @param cmd
+	 */
+	protected void processMoreArguments(CommandLine cmd) {
+	}
+
+	/**
+	 * Override this method to do additional setup with the GiraphJob that will
+	 * run.
+	 * 
+	 * @param job
+	 *            GiraphJob that is going to run
+	 */
+	protected void initGiraphJob(GiraphJob job) {
+		System.out.println(getClass().getSimpleName() + " with");
+		String prefix = "\t";
+		System.out.println(prefix + "-vertexClass="
+				+ vertexClass.getCanonicalName());
+		System.out.println(prefix + "-vertexInputFormatClass="
+				+ vertexInputFormatClass.getCanonicalName());
+		System.out.println(prefix + "-vertexOutputFormatClass="
+				+ vertexOutputFormatClass.getCanonicalName());
+		System.out.println(prefix + "-inputTable=" + inputTableName);
+		if (inputTableFilterExpr != null)
+			System.out.println(prefix + "-inputPartition=\""
+					+ inputTableFilterExpr + "\"");
+		System.out.println(prefix + "-outputTable=" + outputTableName);
+		if (outputTablePartitionValues != null)
+			System.out.println(prefix + "-outputPartition=\""
+					+ serializePartitionValues(outputTablePartitionValues)
+					+ "\"");
+		System.out.println(prefix + "-workers=" + workers);
+	}
+
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of input and output format classes for loading and storing Hive/Pig data using HCatalog.
+ */
+package org.apache.giraph.format.hcatalog;
+