You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 09:53:00 UTC
svn commit: r1388358 - in /giraph/trunk: ./ giraph-formats-contrib/
giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/
Author: aching
Date: Fri Sep 21 07:53:00 2012
New Revision: 1388358
URL: http://svn.apache.org/viewvc?rev=1388358&view=rev
Log:
GIRAPH-93: Hive input / output format. (nitayj via aching)
Added:
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph-formats-contrib/pom.xml
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1388358&r1=1388357&r2=1388358&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Sep 21 07:53:00 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-93: Hive input / output format. (nitayj via aching)
+
GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
(nitayj via aching)
Modified: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1388358&r1=1388357&r2=1388358&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (original)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Fri Sep 21 07:53:00 2012
@@ -28,6 +28,8 @@ under the License.
<compileSource>1.6</compileSource>
<hadoop.version>0.20.203.0</hadoop.version>
<hbase.version>0.90.5</hbase.version>
+ <hive.version>0.10.0-SNAPSHOT</hive.version>
+ <hcatalog.version>0.4.0-dev</hcatalog.version>
<accumulo.version>1.4.0</accumulo.version>
<maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>2.6</maven-javadoc-plugin.version>
@@ -144,5 +146,15 @@ under the License.
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hcatalog</groupId>
+ <artifactId>hcatalog</artifactId>
+ <version>${hcatalog.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
</dependencies>
</project>
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Abstract class that users should subclass to load data from a Hive or Pig
+ * table. You can easily implement a {@link HCatalogVertexReader} by extending
+ * either {@link SingleRowHCatalogVertexReader} or
+ * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
+ * stored in the input table.
+ * <p>
+ * The desired database and table name to load from can be specified via
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * as you setup your vertex input format with
+ * {@link GiraphJob#setVertexInputFormatClass(Class)}.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+
+ protected HCatInputFormat hCatInputFormat = new HCatInputFormat();
+
+ @Override
+ public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return hCatInputFormat.getSplits(context);
+ }
+
+ /**
+ * Abstract class that users should subclass based on their specific vertex
+ * input. HCatRecord can be parsed to get the required data for implementing
+ * getCurrentVertex(). If the vertex spans more than one HCatRecord,
+ * nextVertex() should be overwritten to handle that logic as well.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message value
+ */
+ protected abstract class HCatalogVertexReader implements
+ VertexReader<I, V, E, M> {
+
+ /** Internal HCatRecordReader */
+ private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
+
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the HCatRecordReader.
+ *
+ * @param hCatRecordReader
+ * Internal reader
+ */
+ private void initialize(
+ RecordReader<WritableComparable, HCatRecord> hCatRecordReader) {
+ this.hCatRecordReader = hCatRecordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ hCatRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ // Users can override this if desired, and a vertex is bigger than
+ // a single row.
+ return hCatRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ hCatRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return hCatRecordReader.getProgress();
+ }
+
+ /**
+ * Get the record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<WritableComparable, HCatRecord> getRecordReader() {
+ return hCatRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ protected abstract HCatalogVertexReader createVertexReader();
+
+ @Override
+ public final VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ try {
+ HCatalogVertexReader reader = createVertexReader();
+ reader.initialize(hCatInputFormat
+ .createRecordReader(split, context));
+ return reader;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createVertexReader: Interrupted creating reader.", e);
+ }
+ }
+
+ /**
+ * HCatalogVertexReader for tables holding complete vertex info within each
+ * row.
+ */
+ protected abstract class SingleRowHCatalogVertexReader extends
+ HCatalogVertexReader {
+
+ protected abstract I getVertexId(HCatRecord record);
+
+ protected abstract V getVertexValue(HCatRecord record);
+
+ protected abstract Map<I, E> getEdges(HCatRecord record);
+
+ private int recordCount = 0;
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ Vertex<I, V, E, M> vertex = BspUtils.createVertex(getContext()
+ .getConfiguration());
+ vertex.initialize(getVertexId(record), getVertexValue(record),
+ getEdges(record), null);
+ ++recordCount;
+ if ((recordCount % 1000) == 0) {
+ System.out.println("read " + recordCount + " records");
+ // memory usage
+ Runtime runtime = Runtime.getRuntime();
+ double gb = 1024 * 1024 * 1024;
+ System.out.println("Memory: " + (runtime.totalMemory() / gb)
+ + "GB total = "
+ + ((runtime.totalMemory() - runtime.freeMemory()) / gb)
+ + "GB used + " + (runtime.freeMemory() / gb)
+ + "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+ }
+ return vertex;
+ }
+
+ }
+
+ /**
+ * HCatalogVertexReader for tables holding vertex info across multiple rows
+ * sorted by vertex id column, so that they appear consecutively to the
+ * RecordReader.
+ */
+ protected abstract class MultiRowHCatalogVertexReader extends
+ HCatalogVertexReader {
+
+ protected abstract I getVertexId(HCatRecord record);
+
+ protected abstract V getVertexValue(Iterable<HCatRecord> records);
+
+ protected abstract I getTargetVertexId(HCatRecord record);
+
+ protected abstract E getEdgeValue(HCatRecord record);
+
+ private Vertex<I, V, E, M> vertex = null;
+
+ @Override
+ public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ return vertex;
+ }
+
+ private I currentVertexId = null;
+ private Map<I, E> destEdgeMap = Maps.newHashMap();
+ private List<HCatRecord> recordsForVertex = Lists.newArrayList();
+ private int recordCount = 0;
+
+ @Override
+ public final boolean nextVertex() throws IOException,
+ InterruptedException {
+ while (getRecordReader().nextKeyValue()) {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ if (currentVertexId == null) {
+ currentVertexId = getVertexId(record);
+ }
+ if (currentVertexId.equals(getVertexId(record))) {
+ destEdgeMap.put(getTargetVertexId(record),
+ getEdgeValue(record));
+ recordsForVertex.add(record);
+ } else {
+ createCurrentVertex();
+ if ((recordCount % 1000) == 0) {
+ System.out.println("read " + recordCount);
+ }
+ currentVertexId = getVertexId(record);
+ recordsForVertex.add(record);
+ return true;
+ }
+ }
+
+ if (destEdgeMap.isEmpty()) {
+ return false;
+ } else {
+ createCurrentVertex();
+ return true;
+ }
+
+ }
+
+ private void createCurrentVertex() {
+ vertex = BspUtils.createVertex(getContext().getConfiguration());
+ vertex.initialize(currentVertexId,
+ getVertexValue(recordsForVertex), destEdgeMap, null);
+ destEdgeMap.clear();
+ recordsForVertex.clear();
+ ++recordCount;
+ }
+
+ }
+
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+/**
+ * Abstract class that users should subclass to store data to Hive or Pig table.
+ * You can easily implement a {@link HCatalogVertexWriter} by extending
+ * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
+ * depending on how you want to fit your vertices into the output table.
+ * <p>
+ * The desired database and table name to store to can be specified via
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * as you setup your vertex output format with
+ * {@link GiraphJob#setVertexOutputFormatClass(Class)}. You must create the
+ * output table.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+
+ protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+ @Override
+ public final void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ hCatOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return hCatOutputFormat.getOutputCommitter(context);
+ }
+
+ /**
+ * Abstract class that users should subclass based on their specific vertex
+ * output. Users should implement writeVertex to create a HCatRecord that is
+ * valid to for writing by HCatalogRecordWriter.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ protected abstract class HCatalogVertexWriter implements
+ VertexWriter<I, V, E> {
+
+ /** Internal HCatRecordWriter */
+ private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the HCatRecordWriter
+ *
+ * @param hCatRecordWriter
+ * Internal writer
+ */
+ private void initialize(
+ RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter) {
+ this.hCatRecordWriter = hCatRecordWriter;
+ }
+
+ /**
+ * Get the record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter() {
+ return hCatRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ hCatRecordWriter.close(context);
+ }
+
+ }
+
+ protected abstract HCatalogVertexWriter createVertexWriter();
+
+ @Override
+ public final VertexWriter<I, V, E> createVertexWriter(
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ HCatalogVertexWriter writer = createVertexWriter();
+ writer.initialize(hCatOutputFormat.getRecordWriter(context));
+ return writer;
+ }
+
+ /**
+ * HCatalogVertexWriter to write each vertex in each row.
+ */
+ protected abstract class SingleRowHCatalogVertexWriter extends
+ HCatalogVertexWriter {
+
+ protected abstract int getNumColumns();
+
+ protected abstract void fillRecord(HCatRecord record,
+ Vertex<I, V, E, ?> vertex);
+
+ protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+ HCatRecord record = new DefaultHCatRecord(getNumColumns());
+ fillRecord(record, vertex);
+ return record;
+ }
+
+ @Override
+ // XXX It is important not to put generic type signature <I,V,E,?> after
+ // Vertex. Otherwise, any class that extends this will not compile
+ // because of not implementing the VertexWriter#writeVertex. Mystery of
+ // Java Generics :(
+ @SuppressWarnings("unchecked")
+ public final void writeVertex(Vertex vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(null, createRecord(vertex));
+ }
+
+ }
+
+ /**
+ * HCatalogVertexWriter to write each vertex in multiple rows.
+ */
+ public abstract class MultiRowHCatalogVertexWriter extends
+ HCatalogVertexWriter {
+
+ protected abstract Iterable<HCatRecord> createRecords(
+ Vertex<I, V, E, ?> vertex);
+
+ @Override
+ // XXX Same thing here. No Generics for Vertex here.
+ @SuppressWarnings("unchecked")
+ public final void writeVertex(Vertex vertex) throws IOException,
+ InterruptedException {
+ Iterable<HCatRecord> records = createRecords(vertex);
+ for (HCatRecord record : records) {
+ getRecordWriter().write(null, record);
+ }
+ }
+
+ }
+
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveGiraphRunner implements Tool {
+
+ @SuppressWarnings("rawtypes")
+ private Class<? extends Vertex> vertexClass;
+ @SuppressWarnings("rawtypes")
+ private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
+ @SuppressWarnings("rawtypes")
+ private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
+
+ protected HiveGiraphRunner(
+ @SuppressWarnings("rawtypes") Class<? extends Vertex> vertexClass,
+ @SuppressWarnings("rawtypes") Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
+ @SuppressWarnings("rawtypes") Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
+ this.vertexClass = vertexClass;
+ this.vertexInputFormatClass = vertexInputFormatClass;
+ this.vertexOutputFormatClass = vertexOutputFormatClass;
+ this.conf = new HiveConf(getClass());
+ }
+
+ protected String dbName;
+ protected String inputTableName;
+ protected String inputTableFilterExpr;
+ protected String outputTableName;
+ protected Map<String, String> outputTablePartitionValues;
+
+ protected int workers;
+ protected boolean isVerbose;
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner
+ .run(new HiveGiraphRunner(null, null, null), args));
+ }
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ // process args
+ try {
+ processArguments(args);
+ } catch (InterruptedException e) {
+ return 0;
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ return -1;
+ }
+
+ // additional configuration for Hive
+ adjustConfigurationForHive(getConf());
+
+ // setup GiraphJob
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.setVertexClass(vertexClass);
+
+ // setup input from Hive
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
+ inputTableFilterExpr);
+ HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
+ job.setVertexInputFormatClass(vertexInputFormatClass);
+
+ // setup output to Hive
+ HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+ dbName, outputTableName, outputTablePartitionValues));
+ HCatOutputFormat.setSchema(job.getInternalJob(),
+ HCatOutputFormat.getTableSchema(job.getInternalJob()));
+ job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+ job.setWorkerConfiguration(workers, workers, 100.0f);
+ initGiraphJob(job);
+
+ return job.run(isVerbose) ? 0 : -1;
+ }
+
+ private static void adjustConfigurationForHive(Configuration conf) {
+ // when output partitions are used, workers register them to the
+ // metastore at cleanup stage, and on HiveConf's initialization, it
+ // looks for hive-site.xml from.
+ addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+ .getResource("hive-site.xml").toString());
+
+ // Also, you need hive.aux.jars as well
+ // addToStringCollection(conf, "tmpjars",
+ // conf.getStringCollection("hive.aux.jars.path"));
+
+ // Or, more effectively, we can provide all the jars client needed to
+ // the workers as well
+ String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+ File.pathSeparator);
+ List<String> hadoopJarURLs = Lists.newArrayList();
+ for (String jarPath : hadoopJars) {
+ File file = new File(jarPath);
+ if (file.exists() && file.isFile()) {
+ String jarURL = file.toURI().toString();
+ hadoopJarURLs.add(jarURL);
+ }
+ }
+ addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+ }
+
+ private CommandLine processArguments(String[] args) throws ParseException,
+ InterruptedException {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("D", "hiveconf", true,
+ "property=value for Hive/Hadoop configuration");
+ options.addOption("w", "workers", true, "Number of workers");
+ if (vertexClass == null)
+ options.addOption(null, "vertexClass", true,
+ "Giraph Vertex class to use");
+ if (vertexInputFormatClass == null)
+ options.addOption(null, "vertexInputFormatClass", true,
+ "Giraph HCatalogVertexInputFormat class to use");
+ if (vertexOutputFormatClass == null)
+ options.addOption(null, "vertexOutputFormatClass", true,
+ "Giraph HCatalogVertexOutputFormat class to use");
+ options.addOption("db", "database", true, "Hive database name");
+ options.addOption("i", "inputTable", true, "Input table name");
+ options.addOption("I", "inputPartition", true,
+ "Input table partition expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("o", "outputTable", true, "Output table name");
+ options.addOption("O", "outputPartition", true,
+ "Output table partition values (e.g., \"a=1,b=two\")");
+ addMoreOptions(options);
+
+ CommandLineParser parser = new GnuParser();
+ final CommandLine cmdln = parser.parse(options, args);
+ // for (Option opt : cmd.getOptions()) {
+ // System.out.println(" opt -" + opt.getOpt() + " " + opt.getValue());
+ // }
+ if (args.length == 0 || cmdln.hasOption("help")) {
+ new HelpFormatter().printHelp(getClass().getName(), options, true);
+ throw new InterruptedException();
+ }
+
+ // Giraph classes
+ if (cmdln.hasOption("vertexClass"))
+ vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+ Vertex.class);
+ if (cmdln.hasOption("vertexInputFormatClass"))
+ vertexInputFormatClass = findClass(
+ cmdln.getOptionValue("vertexInputFormatClass"),
+ HCatalogVertexInputFormat.class);
+ if (cmdln.hasOption("vertexOutputFormatClass"))
+ vertexOutputFormatClass = findClass(
+ cmdln.getOptionValue("vertexOutputFormatClass"),
+ HCatalogVertexOutputFormat.class);
+
+ if (vertexClass == null)
+ throw new IllegalArgumentException(
+ "Need the Giraph Vertex class name (-vertexClass) to use");
+ if (vertexInputFormatClass == null)
+ throw new IllegalArgumentException(
+ "Need the Giraph VertexInputFormat class name (-vertexInputFormatClass) to use");
+ if (vertexOutputFormatClass == null)
+ throw new IllegalArgumentException(
+ "Need the Giraph VertexOutputFormat class name (-vertexOutputFormatClass) to use");
+
+ if (!cmdln.hasOption("workers"))
+ throw new IllegalArgumentException(
+ "Need to choose the number of workers (-w)");
+ if (!cmdln.hasOption("inputTable"))
+ throw new IllegalArgumentException(
+ "Need to set the input table name (-i). One example is 'dim_friendlist'");
+ if (!cmdln.hasOption("outputTable"))
+ throw new IllegalArgumentException(
+ "Need to set the output table name (-o).");
+
+ dbName = cmdln.getOptionValue("dbName", "default");
+ inputTableName = cmdln.getOptionValue("inputTable");
+ inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
+ outputTableName = cmdln.getOptionValue("outputTable");
+ outputTablePartitionValues = parsePartitionValues(cmdln
+ .getOptionValue("outputPartition"));
+ workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+ isVerbose = cmdln.hasOption("verbose");
+
+ // pick up -hiveconf arguments
+ Configuration conf = getConf();
+ for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+ String[] keyval = hiveconf.split("=", 2);
+ if (keyval.length == 2) {
+ String name = keyval[0];
+ String value = keyval[1];
+ if (name.equals("tmpjars") || name.equals("tmpfiles"))
+ addToStringCollection(conf, name, value);
+ else
+ conf.set(name, value);
+ }
+ }
+
+ processMoreArguments(cmdln);
+
+ return cmdln;
+ }
+
+ private static void addToStringCollection(Configuration conf, String name,
+ String... values) {
+ addToStringCollection(conf, name, Arrays.asList(values));
+ }
+
+ private static void addToStringCollection(Configuration conf, String name,
+ Collection<? extends String> values) {
+ Collection<String> tmpfiles = conf.getStringCollection(name);
+ tmpfiles.addAll(values);
+ conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+ // System.out.println(name + "=" + conf.get(name));
+ }
+
+ private <T> Class<? extends T> findClass(String className, Class<T> base) {
+ try {
+ Class<?> cls = Class.forName(className);
+ if (base.isAssignableFrom(cls))
+ return cls.asSubclass(base);
+ return null;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(className
+ + ": Invalid class name");
+ }
+ }
+
+ // TODO use Hive util class if this is already provided by it
+ public static Map<String, String> parsePartitionValues(
+ String outputTablePartitionString) {
+ if (outputTablePartitionString != null) {
+ Map<String, String> partitionValues = Maps.newHashMap();
+ for (String partkeyval : outputTablePartitionString.split(" *, *")) {
+ String[] keyval = partkeyval.split(" *= *", 2);
+ if (keyval.length < 2)
+ throw new IllegalArgumentException(
+ "Unrecognized partition value format: "
+ + outputTablePartitionString);
+ partitionValues.put(keyval[0], keyval[1]);
+ }
+ return partitionValues;
+ } else
+ return null;
+ }
+
+ private static String serializePartitionValues(
+ Map<String, String> outputTablePartitionValues) {
+ StringBuilder outputTablePartitionValuesString = new StringBuilder();
+ for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
+ .entrySet()) {
+ if (outputTablePartitionValuesString.length() != 0)
+ outputTablePartitionValuesString.append(",");
+ outputTablePartitionValuesString
+ .append(partitionValueEntry.getKey()).append("=")
+ .append(partitionValueEntry.getValue());
+ }
+ return outputTablePartitionValuesString.toString();
+ }
+
+ /** Configuration */
+ private Configuration conf;
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Override this method to add more command-line options. You can process
+ * them by also overriding {@link #processMoreArguments(CommandLine)}.
+ *
+ * @param options
+ */
+ protected void addMoreOptions(Options options) {
+ }
+
+ /**
+ * Override this method to process additional command-line arguments. You
+ * may want to declare additional options by also overriding
+ * {@link #addMoreOptions(Options)}.
+ *
+ * @param cmd
+ */
+ protected void processMoreArguments(CommandLine cmd) {
+ }
+
+ /**
+ * Override this method to do additional setup with the GiraphJob that will
+ * run.
+ *
+ * @param job
+ * GiraphJob that is going to run
+ */
+ protected void initGiraphJob(GiraphJob job) {
+ System.out.println(getClass().getSimpleName() + " with");
+ String prefix = "\t";
+ System.out.println(prefix + "-vertexClass="
+ + vertexClass.getCanonicalName());
+ System.out.println(prefix + "-vertexInputFormatClass="
+ + vertexInputFormatClass.getCanonicalName());
+ System.out.println(prefix + "-vertexOutputFormatClass="
+ + vertexOutputFormatClass.getCanonicalName());
+ System.out.println(prefix + "-inputTable=" + inputTableName);
+ if (inputTableFilterExpr != null)
+ System.out.println(prefix + "-inputPartition=\""
+ + inputTableFilterExpr + "\"");
+ System.out.println(prefix + "-outputTable=" + outputTableName);
+ if (outputTablePartitionValues != null)
+ System.out.println(prefix + "-outputPartition=\""
+ + serializePartitionValues(outputTablePartitionValues)
+ + "\"");
+ System.out.println(prefix + "-workers=" + workers);
+ }
+
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java?rev=1388358&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java Fri Sep 21 07:53:00 2012
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of input and output format classes for loading and storing Hive/Pig data using HCatalog.
+ */
+package org.apache.giraph.format.hcatalog;
+