You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/05 23:59:41 UTC
svn commit: r1394835 - in /giraph/trunk: ./ giraph-formats-contrib/
giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/
giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/
giraph-formats-contrib/src/main/java/org/ap...
Author: aching
Date: Fri Oct 5 21:59:40 2012
New Revision: 1394835
URL: http://svn.apache.org/viewvc?rev=1394835&view=rev
Log:
GIRAPH-350: HBaseVertex i/o formats are not being injected with
Configuration via Configurable interface. (bfem via aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph-formats-contrib/pom.xml
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Oct 5 21:59:40 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-350: HBaseVertex i/o formats are not being injected with
+ Configuration via Configurable interface. (bfem via aching)
+
GIRAPH-356: Improve ZooKeeper issues. (aching)
GIRAPH-342: Recursive ZooKeeper calls should call progress, dynamic
Modified: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (original)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Fri Oct 5 21:59:40 2012
@@ -18,7 +18,7 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.giraph</groupId>
@@ -27,6 +27,7 @@ under the License.
<properties>
<compileSource>1.6</compileSource>
<hadoop.version>0.20.203.0</hadoop.version>
+ <forHadoop>for-hadoop-${hadoop.version}</forHadoop>
<hbase.version>0.90.5</hbase.version>
<hcatalog.version>0.5.0-SNAPSHOT</hcatalog.version>
<hive.version>0.9.0</hive.version>
@@ -47,7 +48,7 @@ under the License.
<systemProperties>
<property>
<name>prop.jarLocation</name>
- <value>${giraph.trunk.base}/target/giraph-${project.version}-jar-with-dependencies.jar</value>
+ <value>${giraph.trunk.base}/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
</property>
</systemProperties>
</configuration>
@@ -60,15 +61,34 @@ under the License.
<configLocation>../checkstyle.xml</configLocation>
<enableRulesSummary>false</enableRulesSummary>
<headerLocation>../license-header.txt</headerLocation>
- <failOnError>true</failOnError>
- <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <failOnError>true</failOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<phase>verify</phase>
<goals>
- <goal>check</goal>
- </goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>add-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${basedir}/src/main/java/org/apache/giraph/format/hcatalog</source>
+ </sources>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -193,9 +213,9 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-common</artifactId>
- <version>${hive.version}</version>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <version>${hive.version}</version>
</dependency>
</dependencies>
</project>
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java Fri Oct 5 21:59:40 2012
@@ -22,8 +22,6 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,7 +50,7 @@ public abstract class AccumuloVertexInpu
V extends Writable,
E extends Writable,
M extends Writable>
- extends VertexInputFormat<I, V, E, M> implements Configurable {
+ extends VertexInputFormat<I, V, E, M> {
/**
* delegate input format for all accumulo operations.
*/
@@ -60,21 +58,6 @@ public abstract class AccumuloVertexInpu
new AccumuloInputFormat();
/**
- * Configured and injected by the job
- */
- private Configuration conf;
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
* Abstract class which provides a template for instantiating vertices
* from Accumulo Key/Value pairs.
*
@@ -103,7 +86,15 @@ public abstract class AccumuloVertexInpu
this.reader = reader;
}
- @Override
+ /**
+ * initialize the reader.
+ *
+ * @param inputSplit Input split to be used for reading vertices.
+ * @param context Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+
public void initialize(InputSplit inputSplit,
TaskAttemptContext context)
throws IOException, InterruptedException {
@@ -160,6 +151,7 @@ public abstract class AccumuloVertexInpu
* @throws IOException
* @throws InterruptedException
*/
+ @Override
public List<InputSplit> getSplits(
JobContext context, int numWorkers)
throws IOException, InterruptedException {
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java Fri Oct 5 21:59:40 2012
@@ -21,8 +21,6 @@ import org.apache.accumulo.core.client.m
import org.apache.accumulo.core.data.Mutation;
import org.apache.giraph.graph.VertexOutputFormat;
import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -50,7 +48,7 @@ public abstract class AccumuloVertexOutp
I extends WritableComparable,
V extends Writable,
E extends Writable>
- extends VertexOutputFormat<I, V, E> implements Configurable {
+ extends VertexOutputFormat<I, V, E> {
/**
@@ -64,12 +62,6 @@ public abstract class AccumuloVertexOutp
protected AccumuloOutputFormat accumuloOutputFormat =
new AccumuloOutputFormat();
-
- /**
- * Used by configured interface
- */
- private Configuration conf;
-
/**
*
* Main abstraction point for vertex writers to persist back
@@ -145,17 +137,6 @@ public abstract class AccumuloVertexOutp
}
}
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
-
/**
*
* checkOutputSpecs
@@ -164,6 +145,7 @@ public abstract class AccumuloVertexOutp
* @throws IOException
* @throws InterruptedException
*/
+ @Override
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
try {
@@ -185,6 +167,7 @@ public abstract class AccumuloVertexOutp
* @throws IOException
* @throws InterruptedException
*/
+ @Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return accumuloOutputFormat.getOutputCommitter(context);
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java Fri Oct 5 21:59:40 2012
@@ -19,7 +19,6 @@ package org.apache.giraph.format.hbase;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -60,32 +59,22 @@ public abstract class HBaseVertexInputF
V extends Writable,
E extends Writable,
M extends Writable>
- extends VertexInputFormat<I, V, E, M> implements Configurable {
+ extends VertexInputFormat<I, V, E, M> {
/**
* delegate HBase table input format
*/
- protected TableInputFormat tableInputFormat =
- new TableInputFormat();
- /**
- * Injected conf by Configurable interface
- */
- private Configuration conf;
+ protected static TableInputFormat BASE_FORMAT =
+ new TableInputFormat();
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * setConf()
- *
- * We must initialize the table format since we manually instantiate it.
- *
- * @param conf Configuration object
- */
- public void setConf(Configuration conf) {
- tableInputFormat.setConf(conf);
- this.conf = conf;
+ /**
+ * static method to initialize
+ * base table input format
+ * with Configuration.
+ * @param conf job configuration
+ */
+ public static void setConf(Configuration conf) {
+ BASE_FORMAT.setConf(conf);
}
/**
@@ -193,6 +182,6 @@ public abstract class HBaseVertexInputF
public List<InputSplit> getSplits(
JobContext context, int numWorkers)
throws IOException, InterruptedException {
- return tableInputFormat.getSplits(context);
+ return BASE_FORMAT.getSplits(context);
}
}
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java Fri Oct 5 21:59:40 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.format.hbase;
import org.apache.giraph.graph.VertexOutputFormat;
import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -56,17 +55,13 @@ public abstract class HBaseVertexOutputF
V extends Writable,
E extends Writable>
extends VertexOutputFormat
- <I, V, E> implements Configurable {
+ <I, V, E> {
/**
* delegate output format that writes to HBase
*/
- protected TableOutputFormat<ImmutableBytesWritable>
- tableOutputFormat = new TableOutputFormat<ImmutableBytesWritable>();
- /**
- * Injected conf by Configurable
- */
- private Configuration conf;
+ protected static TableOutputFormat<ImmutableBytesWritable>
+ BASE_FORMAT = new TableOutputFormat<ImmutableBytesWritable>();
/**
* Constructor
@@ -154,13 +149,8 @@ public abstract class HBaseVertexOutputF
*
* @param conf Injected configuration instance
*/
- public void setConf(Configuration conf) {
- tableOutputFormat.setConf(conf);
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return this.conf;
+ public static void setConf(Configuration conf) {
+ BASE_FORMAT.setConf(conf);
}
/**
@@ -172,7 +162,7 @@ public abstract class HBaseVertexOutputF
*/
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
- tableOutputFormat.checkOutputSpecs(context);
+ BASE_FORMAT.checkOutputSpecs(context);
}
/**
@@ -186,6 +176,6 @@ public abstract class HBaseVertexOutputF
public OutputCommitter getOutputCommitter(
TaskAttemptContext context)
throws IOException, InterruptedException {
- return tableOutputFormat.getOutputCommitter(context);
+ return BASE_FORMAT.getOutputCommitter(context);
}
}
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java Fri Oct 5 21:59:40 2012
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
@@ -38,6 +37,7 @@ import org.apache.hcatalog.mapreduce.HCa
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.log4j.Logger;
/**
* Abstract class that users should subclass to load data from a Hive or Pig
@@ -47,235 +47,338 @@ import com.google.common.collect.Maps;
* stored in the input table.
* <p>
* The desired database and table name to load from can be specified via
- * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.InputJobInfo)}
* as you setup your vertex input format with
* {@link GiraphJob#setVertexInputFormatClass(Class)}.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
- * @param <M>
- * Message value
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
-
- protected HCatInputFormat hCatInputFormat = new HCatInputFormat();
-
- @Override
- public final List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- return hCatInputFormat.getSplits(context);
- }
-
- /**
- * Abstract class that users should subclass based on their specific vertex
- * input. HCatRecord can be parsed to get the required data for implementing
- * getCurrentVertex(). If the vertex spans more than one HCatRecord,
- * nextVertex() should be overwritten to handle that logic as well.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
- * @param <M>
- * Message value
- */
- protected abstract class HCatalogVertexReader implements
- VertexReader<I, V, E, M> {
-
- /** Internal HCatRecordReader */
- private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
-
- /** Context passed to initialize */
- private TaskAttemptContext context;
-
- /**
- * Initialize with the HCatRecordReader.
- *
- * @param hCatRecordReader
- * Internal reader
- */
- private void initialize(
- RecordReader<WritableComparable, HCatRecord> hCatRecordReader) {
- this.hCatRecordReader = hCatRecordReader;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- hCatRecordReader.initialize(inputSplit, context);
- this.context = context;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- // Users can override this if desired, and a vertex is bigger than
- // a single row.
- return hCatRecordReader.nextKeyValue();
- }
-
- @Override
- public void close() throws IOException {
- hCatRecordReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return hCatRecordReader.getProgress();
- }
-
- /**
- * Get the record reader.
- *
- * @return Record reader to be used for reading.
- */
- protected RecordReader<WritableComparable, HCatRecord> getRecordReader() {
- return hCatRecordReader;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected TaskAttemptContext getContext() {
- return context;
- }
- }
-
- protected abstract HCatalogVertexReader createVertexReader();
-
- @Override
- public final VertexReader<I, V, E, M> createVertexReader(InputSplit split,
- TaskAttemptContext context) throws IOException {
- try {
- HCatalogVertexReader reader = createVertexReader();
- reader.initialize(hCatInputFormat
- .createRecordReader(split, context));
- return reader;
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createVertexReader: Interrupted creating reader.", e);
- }
- }
-
- /**
- * HCatalogVertexReader for tables holding complete vertex info within each
- * row.
- */
- protected abstract class SingleRowHCatalogVertexReader extends
- HCatalogVertexReader {
-
- protected abstract I getVertexId(HCatRecord record);
-
- protected abstract V getVertexValue(HCatRecord record);
-
- protected abstract Map<I, E> getEdges(HCatRecord record);
-
- private int recordCount = 0;
-
- @Override
- public final Vertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException {
- HCatRecord record = getRecordReader().getCurrentValue();
- Vertex<I, V, E, M> vertex = BspUtils.createVertex(getContext()
- .getConfiguration());
- vertex.initialize(getVertexId(record), getVertexValue(record),
- getEdges(record), null);
- ++recordCount;
- if ((recordCount % 1000) == 0) {
- System.out.println("read " + recordCount + " records");
- // memory usage
- Runtime runtime = Runtime.getRuntime();
- double gb = 1024 * 1024 * 1024;
- System.out.println("Memory: " + (runtime.totalMemory() / gb)
- + "GB total = "
- + ((runtime.totalMemory() - runtime.freeMemory()) / gb)
- + "GB used + " + (runtime.freeMemory() / gb)
- + "GB free, " + (runtime.maxMemory() / gb) + "GB max");
- }
- return vertex;
- }
-
- }
-
- /**
- * HCatalogVertexReader for tables holding vertex info across multiple rows
- * sorted by vertex id column, so that they appear consecutively to the
- * RecordReader.
- */
- protected abstract class MultiRowHCatalogVertexReader extends
- HCatalogVertexReader {
-
- protected abstract I getVertexId(HCatRecord record);
-
- protected abstract V getVertexValue(Iterable<HCatRecord> records);
-
- protected abstract I getTargetVertexId(HCatRecord record);
-
- protected abstract E getEdgeValue(HCatRecord record);
-
- private Vertex<I, V, E, M> vertex = null;
-
- @Override
- public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
- InterruptedException {
- return vertex;
- }
-
- private I currentVertexId = null;
- private Map<I, E> destEdgeMap = Maps.newHashMap();
- private List<HCatRecord> recordsForVertex = Lists.newArrayList();
- private int recordCount = 0;
-
- @Override
- public final boolean nextVertex() throws IOException,
- InterruptedException {
- while (getRecordReader().nextKeyValue()) {
- HCatRecord record = getRecordReader().getCurrentValue();
- if (currentVertexId == null) {
- currentVertexId = getVertexId(record);
- }
- if (currentVertexId.equals(getVertexId(record))) {
- destEdgeMap.put(getTargetVertexId(record),
- getEdgeValue(record));
- recordsForVertex.add(record);
- } else {
- createCurrentVertex();
- if ((recordCount % 1000) == 0) {
- System.out.println("read " + recordCount);
- }
- currentVertexId = getVertexId(record);
- recordsForVertex.add(record);
- return true;
- }
- }
-
- if (destEdgeMap.isEmpty()) {
- return false;
- } else {
- createCurrentVertex();
- return true;
- }
-
- }
-
- private void createCurrentVertex() {
- vertex = BspUtils.createVertex(getContext().getConfiguration());
- vertex.initialize(currentVertexId,
- getVertexValue(recordsForVertex), destEdgeMap, null);
- destEdgeMap.clear();
- recordsForVertex.clear();
- ++recordCount;
- }
-
- }
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /**
+ * H catalog input format.
+ */
+ private HCatInputFormat hCatInputFormat = new HCatInputFormat();
+
+ @Override
+ public final List<InputSplit> getSplits(
+ final JobContext context, final int numWorkers)
+ throws IOException, InterruptedException {
+ return hCatInputFormat.getSplits(context);
+ }
+
+ /**
+ * Abstract class that users should subclass
+ * based on their specific vertex
+ * input. HCatRecord can be parsed to get the
+ * required data for implementing
+ * getCurrentVertex(). If the vertex spans more
+ * than one HCatRecord,
+ * nextVertex() should be overwritten to handle that logic as well.
+ */
+ protected abstract class HCatalogVertexReader implements
+ VertexReader<I, V, E, M> {
+
+ /** Internal HCatRecordReader. */
+ private RecordReader<WritableComparable,
+ HCatRecord> hCatRecordReader;
+
+ /** Context passed to initialize. */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the HCatRecordReader.
+ *
+ * @param recordReader internal reader
+ */
+ private void initialize(
+ final RecordReader<
+ WritableComparable, HCatRecord>
+ recordReader) {
+ this.hCatRecordReader = recordReader;
+ }
+
+ @Override
+ public final void initialize(
+ final InputSplit inputSplit,
+ final TaskAttemptContext ctxt)
+ throws IOException, InterruptedException {
+ hCatRecordReader.initialize(inputSplit, ctxt);
+ this.context = ctxt;
+ }
+
+ @Override
+ public boolean nextVertex()
+ throws IOException, InterruptedException {
+ // Users can override this if desired,
+ // and a vertex is bigger than
+ // a single row.
+ return hCatRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ hCatRecordReader.close();
+ }
+
+ @Override
+ public final float getProgress()
+ throws IOException, InterruptedException {
+ return hCatRecordReader.getProgress();
+ }
+
+ /**
+ * Get the record reader.
+ * @return Record reader to be used for reading.
+ */
+ protected final RecordReader<WritableComparable, HCatRecord>
+ getRecordReader() {
+ return hCatRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ *
+ *
+ * @return Context passed to initialize.
+ */
+ protected final TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ /**
+ * create vertex writer instance.
+ * @return HCatalogVertexReader
+ */
+ protected abstract HCatalogVertexReader createVertexReader();
+
+ @Override
+ public final VertexReader<I, V, E, M>
+ createVertexReader(final InputSplit split,
+ final TaskAttemptContext context)
+ throws IOException {
+ try {
+ HCatalogVertexReader reader = createVertexReader();
+ reader.initialize(hCatInputFormat.
+ createRecordReader(split, context));
+ return reader;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createVertexReader: " +
+ "Interrupted creating reader.", e);
+ }
+ }
+
+ /**
+ * HCatalogVertexReader for tables holding
+ * complete vertex info within each
+ * row.
+ */
+ protected abstract class SingleRowHCatalogVertexReader
+ extends HCatalogVertexReader {
+
+ /**
+ * 1024 const.
+ */
+ private static final int BYTE_CONST = 1024;
+
+ /**
+ * logger
+ */
+ private final Logger log =
+ Logger.getLogger(SingleRowHCatalogVertexReader.class);
+ /**
+ * record count.
+ */
+ private int recordCount = 0;
+ /**
+ * modulus check counter.
+ */
+ private final int recordModLimit = 1000;
+
+
+ /**
+ * get vertex id.
+ * @param record hcat record
+ * @return I id
+ */
+ protected abstract I getVertexId(HCatRecord record);
+
+ /**
+ * get vertext value.
+ * @param record hcat record
+ * @return V value
+ */
+ protected abstract V getVertexValue(HCatRecord record);
+
+ /**
+ * get edges.
+ * @param record hcat record
+ * @return Map edges
+ */
+ protected abstract Map<I, E> getEdges(HCatRecord record);
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ Vertex<I, V, E, M> vertex =
+ BspUtils.createVertex(getContext()
+ .getConfiguration());
+ vertex.initialize(getVertexId(record), getVertexValue(record),
+ getEdges(record), null);
+ ++recordCount;
+ if ((recordCount % recordModLimit) == 0) {
+ log.info("read " + recordCount + " records");
+ // memory usage
+ Runtime runtime = Runtime.getRuntime();
+ double gb = BYTE_CONST *
+ BYTE_CONST *
+ BYTE_CONST;
+ log.info("Memory: " + (runtime.totalMemory() / gb) +
+ "GB total = " +
+ ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
+ "GB used + " + (runtime.freeMemory() / gb) +
+ "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+ }
+ return vertex;
+ }
+ }
+ /**
+ * HCatalogVertexReader for tables
+ * holding vertex info across multiple rows
+ * sorted by vertex id column,
+ * so that they appear consecutively to the
+ * RecordReader.
+ */
+ protected abstract class MultiRowHCatalogVertexReader extends
+ HCatalogVertexReader {
+ /**
+ * modulus check counter.
+ */
+ private static final int RECORD_MOD_LIMIT = 1000;
+
+ /**
+ * logger
+ */
+ private final Logger log =
+ Logger.getLogger(MultiRowHCatalogVertexReader.class);
+ /**
+ * current vertex id.
+ */
+ private I currentVertexId = null;
+ /**
+ * destination edge map.
+ */
+ private Map<I, E> destEdgeMap = Maps.newHashMap();
+ /**
+ * record for vertex.
+ */
+ private List<HCatRecord> recordsForVertex = Lists.newArrayList();
+ /**
+ * record count.
+ */
+ private int recordCount = 0;
+ /**
+ * vertex.
+ *
+ */
+ private Vertex<I, V, E, M> vertex = null;
+ /**
+ * get vertex id from record.
+ *
+ * @param record hcat
+ * @return I vertex id
+ */
+ protected abstract I getVertexId(HCatRecord record);
+
+ /**
+ * get vertex value from record.
+ * @param records all vertex values
+ * @return V iterable of record values
+ */
+ protected abstract V getVertexValue(
+ Iterable<HCatRecord> records);
+
+ /**
+ * get target vertex id from record.
+ *
+ * @param record hcat
+ * @return I vertex id of target.
+ */
+ protected abstract I getTargetVertexId(HCatRecord record);
+
+ /**
+ * get edge value from record.
+ *
+ * @param record hcat.
+ * @return E edge value.
+ */
+ protected abstract E getEdgeValue(HCatRecord record);
+
+ @Override
+ public final Vertex<I, V, E, M>
+ getCurrentVertex()
+ throws IOException, InterruptedException {
+ return vertex;
+ }
+
+ @Override
+ public boolean nextVertex()
+ throws IOException, InterruptedException {
+ while (getRecordReader().nextKeyValue()) {
+ HCatRecord record =
+ getRecordReader().getCurrentValue();
+ if (currentVertexId == null) {
+ currentVertexId = getVertexId(record);
+ }
+ if (currentVertexId.equals(getVertexId(record))) {
+ destEdgeMap.put(
+ getTargetVertexId(record),
+ getEdgeValue(record));
+ recordsForVertex.add(record);
+ } else {
+ createCurrentVertex();
+ if ((recordCount % RECORD_MOD_LIMIT) == 0) {
+ log.info("read " + recordCount);
+ }
+ currentVertexId = getVertexId(record);
+ recordsForVertex.add(record);
+ return true;
+ }
+ }
+
+ if (destEdgeMap.isEmpty()) {
+ return false;
+ } else {
+ createCurrentVertex();
+ return true;
+ }
+ }
+
+ /**
+ * create current vertex.
+ */
+ private void createCurrentVertex() {
+ vertex = BspUtils.
+ createVertex(getContext().getConfiguration());
+ vertex.initialize(currentVertexId,
+ getVertexValue(
+ recordsForVertex), destEdgeMap, null);
+ destEdgeMap.clear();
+ recordsForVertex.clear();
+ ++recordCount;
+ }
+ }
}
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java Fri Oct 5 21:59:40 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.format.hcatalo
import java.io.IOException;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.VertexOutputFormat;
import org.apache.giraph.graph.VertexWriter;
import org.apache.hadoop.io.Writable;
@@ -41,159 +40,175 @@ import org.apache.hcatalog.mapreduce.HCa
* depending on how you want to fit your vertices into the output table.
* <p>
* The desired database and table name to store to can be specified via
- * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.OutputJobInfo)}
* as you setup your vertex output format with
* {@link GiraphJob#setVertexOutputFormatClass(Class)}. You must create the
* output table.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
- extends VertexOutputFormat<I, V, E> {
-
- protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
-
- @Override
- public final void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
- hCatOutputFormat.checkOutputSpecs(context);
- }
-
- @Override
- public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return hCatOutputFormat.getOutputCommitter(context);
- }
-
- /**
- * Abstract class that users should subclass based on their specific vertex
- * output. Users should implement writeVertex to create a HCatRecord that is
- * valid to for writing by HCatalogRecordWriter.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
- */
- protected abstract class HCatalogVertexWriter implements
- VertexWriter<I, V, E> {
-
- /** Internal HCatRecordWriter */
- private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
- /** Context passed to initialize */
- private TaskAttemptContext context;
-
- /**
- * Initialize with the HCatRecordWriter
- *
- * @param hCatRecordWriter
- * Internal writer
- */
- private void initialize(
- RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter) {
- this.hCatRecordWriter = hCatRecordWriter;
- }
-
- /**
- * Get the record reader.
- *
- * @return Record reader to be used for reading.
- */
- protected RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter() {
- return hCatRecordWriter;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected TaskAttemptContext getContext() {
- return context;
- }
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException {
- this.context = context;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- hCatRecordWriter.close(context);
- }
-
- }
-
- protected abstract HCatalogVertexWriter createVertexWriter();
-
- @Override
- public final VertexWriter<I, V, E> createVertexWriter(
- TaskAttemptContext context) throws IOException,
- InterruptedException {
- HCatalogVertexWriter writer = createVertexWriter();
- writer.initialize(hCatOutputFormat.getRecordWriter(context));
- return writer;
- }
-
- /**
- * HCatalogVertexWriter to write each vertex in each row.
- */
- protected abstract class SingleRowHCatalogVertexWriter extends
- HCatalogVertexWriter {
-
- protected abstract int getNumColumns();
-
- protected abstract void fillRecord(HCatRecord record,
- Vertex<I, V, E, ?> vertex);
-
- protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
- HCatRecord record = new DefaultHCatRecord(getNumColumns());
- fillRecord(record, vertex);
- return record;
- }
-
- @Override
- // XXX It is important not to put generic type signature <I,V,E,?> after
- // Vertex. Otherwise, any class that extends this will not compile
- // because of not implementing the VertexWriter#writeVertex. Mystery of
- // Java Generics :(
- @SuppressWarnings("unchecked")
- public final void writeVertex(Vertex vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(null, createRecord(vertex));
- }
-
- }
-
- /**
- * HCatalogVertexWriter to write each vertex in multiple rows.
- */
- public abstract class MultiRowHCatalogVertexWriter extends
- HCatalogVertexWriter {
-
- protected abstract Iterable<HCatRecord> createRecords(
- Vertex<I, V, E, ?> vertex);
-
- @Override
- // XXX Same thing here. No Generics for Vertex here.
- @SuppressWarnings("unchecked")
- public final void writeVertex(Vertex vertex) throws IOException,
- InterruptedException {
- Iterable<HCatRecord> records = createRecords(vertex);
- for (HCatRecord record : records) {
- getRecordWriter().write(null, record);
- }
- }
-
- }
-
+public abstract class HCatalogVertexOutputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /**
+ * hcat output format
+ */
+ protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+ @Override
+ public final void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ hCatOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return hCatOutputFormat.getOutputCommitter(context);
+ }
+
+ /**
+ * Abstract class that users should
+ * subclass based on their specific vertex
+ * output. Users should implement
+ * writeVertex to create a HCatRecord that is
+ * valid to for writing by HCatalogRecordWriter.
+ */
+ protected abstract class HCatalogVertexWriter implements
+ VertexWriter<I, V, E> {
+
+ /** Internal HCatRecordWriter */
+ private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the HCatRecordWriter
+ * @param hCatRecordWriter
+ * Internal writer
+ */
+ private void initialize(
+ RecordWriter<WritableComparable<?>,
+ HCatRecord> hCatRecordWriter) {
+ this.hCatRecordWriter = hCatRecordWriter;
+ }
+
+ /**
+ * Get the record reader.
+ * @return Record reader to be used for reading.
+ */
+ protected RecordWriter<WritableComparable<?>,
+ HCatRecord> getRecordWriter() {
+ return hCatRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ hCatRecordWriter.close(context);
+ }
+
+ }
+
+ /**
+ * create vertex writer.
+ * @return HCatalogVertexWriter
+ */
+ protected abstract HCatalogVertexWriter createVertexWriter();
+
+ @Override
+ public final VertexWriter<I, V, E> createVertexWriter(
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ HCatalogVertexWriter writer = createVertexWriter();
+ writer.initialize(hCatOutputFormat.getRecordWriter(context));
+ return writer;
+ }
+
+ /**
+ * HCatalogVertexWriter to write each vertex in each row.
+ */
+ protected abstract class SingleRowHCatalogVertexWriter extends
+ HCatalogVertexWriter {
+ /**
+ * get num columns
+ * @return intcolumns
+ */
+ protected abstract int getNumColumns();
+
+ /**
+ * fill record
+ * @param record to fill
+ * @param vertex to populate record
+ */
+ protected abstract void fillRecord(HCatRecord record,
+ Vertex<I, V, E, ?> vertex);
+
+ /**
+ * create record
+ * @param vertex to populate record
+ * @return HCatRecord newly created
+ */
+ protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+ HCatRecord record = new DefaultHCatRecord(getNumColumns());
+ fillRecord(record, vertex);
+ return record;
+ }
+
+ @Override
+ // XXX It is important not to put generic type signature <I,V,E,?> after
+ // Vertex. Otherwise, any class that extends this will not compile
+ // because of not implementing the VertexWriter#writeVertex. Mystery of
+ // Java Generics :(
+ @SuppressWarnings("unchecked")
+ public final void writeVertex(Vertex vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(null, createRecord(vertex));
+ }
+
+ }
+
+ /**
+ * HCatalogVertexWriter to write each vertex in multiple rows.
+ */
+ public abstract class MultiRowHCatalogVertexWriter extends
+ HCatalogVertexWriter {
+ /**
+ * create records
+ * @param vertex to populate records
+ * @return Iterable of records
+ */
+ protected abstract Iterable<HCatRecord> createRecords(
+ Vertex<I, V, E, ?> vertex);
+
+ @Override
+ // XXX Same thing here. No Generics for Vertex here.
+ @SuppressWarnings("unchecked")
+ public final void writeVertex(Vertex vertex) throws IOException,
+ InterruptedException {
+ Iterable<HCatRecord> records = createRecords(vertex);
+ for (HCatRecord record : records) {
+ getRecordWriter().write(null, record);
+ }
+ }
+ }
}
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java Fri Oct 5 21:59:40 2012
@@ -31,7 +31,6 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.hadoop.conf.Configuration;
@@ -45,317 +44,425 @@ import org.apache.hcatalog.mapreduce.Out
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.log4j.Logger;
+/**
+ * Hive Giraph Runner
+ *
+ */
public class HiveGiraphRunner implements Tool {
-
- @SuppressWarnings("rawtypes")
- private Class<? extends Vertex> vertexClass;
- @SuppressWarnings("rawtypes")
- private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
- @SuppressWarnings("rawtypes")
- private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
-
- protected HiveGiraphRunner(
- @SuppressWarnings("rawtypes") Class<? extends Vertex> vertexClass,
- @SuppressWarnings("rawtypes") Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
- @SuppressWarnings("rawtypes") Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
- this.vertexClass = vertexClass;
- this.vertexInputFormatClass = vertexInputFormatClass;
- this.vertexOutputFormatClass = vertexOutputFormatClass;
- this.conf = new HiveConf(getClass());
- }
-
- protected String dbName;
- protected String inputTableName;
- protected String inputTableFilterExpr;
- protected String outputTableName;
- protected Map<String, String> outputTablePartitionValues;
-
- protected int workers;
- protected boolean isVerbose;
-
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner
- .run(new HiveGiraphRunner(null, null, null), args));
- }
-
- @Override
- public final int run(String[] args) throws Exception {
- // process args
- try {
- processArguments(args);
- } catch (InterruptedException e) {
- return 0;
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- return -1;
- }
-
- // additional configuration for Hive
- adjustConfigurationForHive(getConf());
-
- // setup GiraphJob
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- GiraphConfiguration conf = job.getConfiguration();
- conf.setVertexClass(vertexClass);
-
- // setup input from Hive
- InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
- inputTableFilterExpr);
- HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
- conf.setVertexInputFormatClass(vertexInputFormatClass);
-
- // setup output to Hive
- HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
- dbName, outputTableName, outputTablePartitionValues));
- HCatOutputFormat.setSchema(job.getInternalJob(),
- HCatOutputFormat.getTableSchema(job.getInternalJob()));
- conf.setVertexOutputFormatClass(vertexOutputFormatClass);
-
- conf.setWorkerConfiguration(workers, workers, 100.0f);
- initGiraphJob(job);
-
- return job.run(isVerbose) ? 0 : -1;
- }
-
- private static void adjustConfigurationForHive(Configuration conf) {
- // when output partitions are used, workers register them to the
- // metastore at cleanup stage, and on HiveConf's initialization, it
- // looks for hive-site.xml from.
- addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
- .getResource("hive-site.xml").toString());
-
- // Also, you need hive.aux.jars as well
- // addToStringCollection(conf, "tmpjars",
- // conf.getStringCollection("hive.aux.jars.path"));
-
- // Or, more effectively, we can provide all the jars client needed to
- // the workers as well
- String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
- File.pathSeparator);
- List<String> hadoopJarURLs = Lists.newArrayList();
- for (String jarPath : hadoopJars) {
- File file = new File(jarPath);
- if (file.exists() && file.isFile()) {
- String jarURL = file.toURI().toString();
- hadoopJarURLs.add(jarURL);
- }
- }
- addToStringCollection(conf, "tmpjars", hadoopJarURLs);
- }
-
- private CommandLine processArguments(String[] args) throws ParseException,
- InterruptedException {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("D", "hiveconf", true,
- "property=value for Hive/Hadoop configuration");
- options.addOption("w", "workers", true, "Number of workers");
- if (vertexClass == null)
- options.addOption(null, "vertexClass", true,
- "Giraph Vertex class to use");
- if (vertexInputFormatClass == null)
- options.addOption(null, "vertexInputFormatClass", true,
- "Giraph HCatalogVertexInputFormat class to use");
- if (vertexOutputFormatClass == null)
- options.addOption(null, "vertexOutputFormatClass", true,
- "Giraph HCatalogVertexOutputFormat class to use");
- options.addOption("db", "database", true, "Hive database name");
- options.addOption("i", "inputTable", true, "Input table name");
- options.addOption("I", "inputFilter", true,
- "Input table filter expression (e.g., \"a<2 AND b='two'\"");
- options.addOption("o", "outputTable", true, "Output table name");
- options.addOption("O", "outputPartition", true,
- "Output table partition values (e.g., \"a=1,b=two\")");
- addMoreOptions(options);
-
- CommandLineParser parser = new GnuParser();
- final CommandLine cmdln = parser.parse(options, args);
- // for (Option opt : cmd.getOptions()) {
- // System.out.println(" opt -" + opt.getOpt() + " " + opt.getValue());
- // }
- if (args.length == 0 || cmdln.hasOption("help")) {
- new HelpFormatter().printHelp(getClass().getName(), options, true);
- throw new InterruptedException();
- }
-
- // Giraph classes
- if (cmdln.hasOption("vertexClass"))
- vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
- Vertex.class);
- if (cmdln.hasOption("vertexInputFormatClass"))
- vertexInputFormatClass = findClass(
- cmdln.getOptionValue("vertexInputFormatClass"),
- HCatalogVertexInputFormat.class);
- if (cmdln.hasOption("vertexOutputFormatClass"))
- vertexOutputFormatClass = findClass(
- cmdln.getOptionValue("vertexOutputFormatClass"),
- HCatalogVertexOutputFormat.class);
-
- if (vertexClass == null)
- throw new IllegalArgumentException(
- "Need the Giraph Vertex class name (-vertexClass) to use");
- if (vertexInputFormatClass == null)
- throw new IllegalArgumentException(
- "Need the Giraph VertexInputFormat class name (-vertexInputFormatClass) to use");
- if (vertexOutputFormatClass == null)
- throw new IllegalArgumentException(
- "Need the Giraph VertexOutputFormat class name (-vertexOutputFormatClass) to use");
-
- if (!cmdln.hasOption("workers"))
- throw new IllegalArgumentException(
- "Need to choose the number of workers (-w)");
- if (!cmdln.hasOption("inputTable"))
- throw new IllegalArgumentException(
- "Need to set the input table name (-i). One example is 'dim_friendlist'");
- if (!cmdln.hasOption("outputTable"))
- throw new IllegalArgumentException(
- "Need to set the output table name (-o).");
-
- dbName = cmdln.getOptionValue("dbName", "default");
- inputTableName = cmdln.getOptionValue("inputTable");
- inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
- outputTableName = cmdln.getOptionValue("outputTable");
- outputTablePartitionValues = parsePartitionValues(cmdln
- .getOptionValue("outputPartition"));
- workers = Integer.parseInt(cmdln.getOptionValue("workers"));
- isVerbose = cmdln.hasOption("verbose");
-
- // pick up -hiveconf arguments
- Configuration conf = getConf();
- for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
- String[] keyval = hiveconf.split("=", 2);
- if (keyval.length == 2) {
- String name = keyval[0];
- String value = keyval[1];
- if (name.equals("tmpjars") || name.equals("tmpfiles"))
- addToStringCollection(conf, name, value);
- else
- conf.set(name, value);
- }
- }
-
- processMoreArguments(cmdln);
-
- return cmdln;
- }
-
- private static void addToStringCollection(Configuration conf, String name,
- String... values) {
- addToStringCollection(conf, name, Arrays.asList(values));
- }
-
- private static void addToStringCollection(Configuration conf, String name,
- Collection<? extends String> values) {
- Collection<String> tmpfiles = conf.getStringCollection(name);
- tmpfiles.addAll(values);
- conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
- // System.out.println(name + "=" + conf.get(name));
- }
-
- private <T> Class<? extends T> findClass(String className, Class<T> base) {
- try {
- Class<?> cls = Class.forName(className);
- if (base.isAssignableFrom(cls))
- return cls.asSubclass(base);
- return null;
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(className
- + ": Invalid class name");
- }
- }
-
- // TODO use Hive util class if this is already provided by it
- public static Map<String, String> parsePartitionValues(
- String outputTablePartitionString) {
- if (outputTablePartitionString != null) {
- Map<String, String> partitionValues = Maps.newHashMap();
- for (String partkeyval : outputTablePartitionString.split(" *, *")) {
- String[] keyval = partkeyval.split(" *= *", 2);
- if (keyval.length < 2)
- throw new IllegalArgumentException(
- "Unrecognized partition value format: "
- + outputTablePartitionString);
- partitionValues.put(keyval[0], keyval[1]);
- }
- return partitionValues;
- } else
- return null;
- }
-
- private static String serializePartitionValues(
- Map<String, String> outputTablePartitionValues) {
- StringBuilder outputTablePartitionValuesString = new StringBuilder();
- for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
- .entrySet()) {
- if (outputTablePartitionValuesString.length() != 0)
- outputTablePartitionValuesString.append(",");
- outputTablePartitionValuesString
- .append(partitionValueEntry.getKey()).append("=")
- .append(partitionValueEntry.getValue());
- }
- return outputTablePartitionValuesString.toString();
- }
-
- /** Configuration */
- private Configuration conf;
-
- @Override
- public final Configuration getConf() {
- return conf;
- }
-
- @Override
- public final void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Override this method to add more command-line options. You can process
- * them by also overriding {@link #processMoreArguments(CommandLine)}.
- *
- * @param options
- */
- protected void addMoreOptions(Options options) {
- }
-
- /**
- * Override this method to process additional command-line arguments. You
- * may want to declare additional options by also overriding
- * {@link #addMoreOptions(Options)}.
- *
- * @param cmd
- */
- protected void processMoreArguments(CommandLine cmd) {
- }
-
- /**
- * Override this method to do additional setup with the GiraphJob that will
- * run.
- *
- * @param job
- * GiraphJob that is going to run
- */
- protected void initGiraphJob(GiraphJob job) {
- System.out.println(getClass().getSimpleName() + " with");
- String prefix = "\t";
- System.out.println(prefix + "-vertexClass="
- + vertexClass.getCanonicalName());
- System.out.println(prefix + "-vertexInputFormatClass="
- + vertexInputFormatClass.getCanonicalName());
- System.out.println(prefix + "-vertexOutputFormatClass="
- + vertexOutputFormatClass.getCanonicalName());
- System.out.println(prefix + "-inputTable=" + inputTableName);
- if (inputTableFilterExpr != null)
- System.out.println(prefix + "-inputFilter=\""
- + inputTableFilterExpr + "\"");
- System.out.println(prefix + "-outputTable=" + outputTableName);
- if (outputTablePartitionValues != null)
- System.out.println(prefix + "-outputPartition=\""
- + serializePartitionValues(outputTablePartitionValues)
- + "\"");
- System.out.println(prefix + "-workers=" + workers);
- }
+ /**
+ * logger
+ */
+ private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
+ /**
+ * workers
+ */
+ protected int workers;
+ /**
+ * is verbose
+ */
+ protected boolean isVerbose;
+ /**
+ * output table partitions
+ */
+ protected Map<String, String> outputTablePartitionValues;
+ /**
+ * dbName
+ */
+ protected String dbName;
+ /**
+ * input table name
+ */
+ protected String inputTableName;
+ /**
+ * input table filter
+ */
+ protected String inputTableFilterExpr;
+ /**
+ * output table name
+ */
+ protected String outputTableName;
+
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * vertex class.
+ */
+ @SuppressWarnings("rawtypes")
+ private Class<? extends Vertex> vertexClass;
+ /**
+ * vertex input format internal.
+ */
+ @SuppressWarnings("rawtypes")
+ private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
+ /**
+ * vertex output format internal.
+ */
+ @SuppressWarnings("rawtypes")
+ private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
+
+ /**
+ * giraph runner class.
+ * @param vertexClass vertec class
+ * @param vertexInputFormatClass input format
+ * @param vertexOutputFormatClass output format
+ */
+ protected HiveGiraphRunner(
+ @SuppressWarnings("rawtypes")
+ Class<? extends Vertex> vertexClass,
+ @SuppressWarnings("rawtypes")
+ Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
+ @SuppressWarnings("rawtypes")
+ Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
+ this.vertexClass = vertexClass;
+ this.vertexInputFormatClass = vertexInputFormatClass;
+ this.vertexOutputFormatClass = vertexOutputFormatClass;
+ this.conf = new HiveConf(getClass());
+ }
+
+ /**
+ * main method
+ * @param args system arguments
+ * @throws Exception any errors from Hive Giraph Runner
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner
+ .run(new HiveGiraphRunner(null, null, null), args));
+ }
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ // process args
+ try {
+ processArguments(args);
+ } catch (InterruptedException e) {
+ return 0;
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ return -1;
+ }
+
+ // additional configuration for Hive
+ adjustConfigurationForHive(getConf());
+
+ // setup GiraphJob
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.getConfiguration().setVertexClass(vertexClass);
+
+ // setup input from Hive
+ InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
+ inputTableFilterExpr);
+ HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
+ job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+
+ // setup output to Hive
+ HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+ dbName, outputTableName, outputTablePartitionValues));
+ HCatOutputFormat.setSchema(job.getInternalJob(),
+ HCatOutputFormat.getTableSchema(job.getInternalJob()));
+ job.getConfiguration().setVertexOutputFormatClass(vertexOutputFormatClass);
+
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+ initGiraphJob(job);
+
+ return job.run(isVerbose) ? 0 : -1;
+ }
+
+ /**
+ * set hive configuration
+ * @param conf Configuration argument
+ */
+ private static void adjustConfigurationForHive(Configuration conf) {
+ // when output partitions are used, workers register them to the
+ // metastore at cleanup stage, and on HiveConf's initialization, it
+ // looks for hive-site.xml from.
+ addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+ .getResource("hive-site.xml").toString());
+
+ // Also, you need hive.aux.jars as well
+ // addToStringCollection(conf, "tmpjars",
+ // conf.getStringCollection("hive.aux.jars.path"));
+
+ // Or, more effectively, we can provide all the jars client needed to
+ // the workers as well
+ String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+ File.pathSeparator);
+ List<String> hadoopJarURLs = Lists.newArrayList();
+ for (String jarPath : hadoopJars) {
+ File file = new File(jarPath);
+ if (file.exists() && file.isFile()) {
+ String jarURL = file.toURI().toString();
+ hadoopJarURLs.add(jarURL);
+ }
+ }
+ addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+ }
+
+ /**
+ * process arguments
+ * @param args to process
+ * @return CommandLine instance
+ * @throws ParseException error parsing arguments
+ * @throws InterruptedException interrupted
+ */
+ private CommandLine processArguments(String[] args) throws ParseException,
+ InterruptedException {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("D", "hiveconf", true,
+ "property=value for Hive/Hadoop configuration");
+ options.addOption("w", "workers", true, "Number of workers");
+ if (vertexClass == null) {
+ options.addOption(null, "vertexClass", true,
+ "Giraph Vertex class to use");
+ }
+ if (vertexInputFormatClass == null) {
+ options.addOption(null, "vertexInputFormatClass", true,
+ "Giraph HCatalogVertexInputFormat class to use");
+ }
+
+ if (vertexOutputFormatClass == null) {
+ options.addOption(null, "vertexOutputFormatClass", true,
+ "Giraph HCatalogVertexOutputFormat class to use");
+ }
+
+ options.addOption("db", "database", true, "Hive database name");
+ options.addOption("i", "inputTable", true, "Input table name");
+ options.addOption("I", "inputFilter", true,
+ "Input table filter expression (e.g., \"a<2 AND b='two'\"");
+ options.addOption("o", "outputTable", true, "Output table name");
+ options.addOption("O", "outputPartition", true,
+ "Output table partition values (e.g., \"a=1,b=two\")");
+ addMoreOptions(options);
+
+ CommandLineParser parser = new GnuParser();
+ final CommandLine cmdln = parser.parse(options, args);
+ if (args.length == 0 || cmdln.hasOption("help")) {
+ new HelpFormatter().printHelp(getClass().getName(), options, true);
+ throw new InterruptedException();
+ }
+
+ // Giraph classes
+ if (cmdln.hasOption("vertexClass")) {
+ vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+ Vertex.class);
+ }
+ if (cmdln.hasOption("vertexInputFormatClass")) {
+ vertexInputFormatClass = findClass(
+ cmdln.getOptionValue("vertexInputFormatClass"),
+ HCatalogVertexInputFormat.class);
+ }
+ if (cmdln.hasOption("vertexOutputFormatClass")) {
+ vertexOutputFormatClass = findClass(
+ cmdln.getOptionValue("vertexOutputFormatClass"),
+ HCatalogVertexOutputFormat.class);
+ }
+
+ if (vertexClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph Vertex class name (-vertexClass) to use");
+ }
+ if (vertexInputFormatClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph VertexInputFormat " +
+ "class name (-vertexInputFormatClass) to use");
+ }
+ if (vertexOutputFormatClass == null) {
+ throw new IllegalArgumentException(
+ "Need the Giraph VertexOutputFormat " +
+ "class name (-vertexOutputFormatClass) to use");
+ }
+ if (!cmdln.hasOption("workers")) {
+ throw new IllegalArgumentException(
+ "Need to choose the number of workers (-w)");
+ }
+ if (!cmdln.hasOption("inputTable")) {
+ throw new IllegalArgumentException(
+ "Need to set the input table name (-i). " +
+ "One example is 'dim_friendlist'");
+ }
+ if (!cmdln.hasOption("outputTable")) {
+ throw new IllegalArgumentException(
+ "Need to set the output table name (-o).");
+ }
+ dbName = cmdln.getOptionValue("dbName", "default");
+ inputTableName = cmdln.getOptionValue("inputTable");
+ inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
+ outputTableName = cmdln.getOptionValue("outputTable");
+ outputTablePartitionValues = parsePartitionValues(cmdln
+ .getOptionValue("outputPartition"));
+ workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+ isVerbose = cmdln.hasOption("verbose");
+
+ // pick up -hiveconf arguments
+ for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+ String[] keyval = hiveconf.split("=", 2);
+ if (keyval.length == 2) {
+ String name = keyval[0];
+ String value = keyval[1];
+ if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+ addToStringCollection(
+ conf, name, value);
+ } else {
+ conf.set(name, value);
+ }
+ }
+ }
+
+ processMoreArguments(cmdln);
+
+ return cmdln;
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(Configuration conf, String name,
+ String... values) {
+ addToStringCollection(conf, name, Arrays.asList(values));
+ }
+
+ /**
+ * add string to collection
+ * @param conf Configuration
+ * @param name to add
+ * @param values values for collection
+ */
+ private static void addToStringCollection(
+ Configuration conf, String name, Collection
+ <? extends String> values) {
+ Collection<String> tmpfiles = conf.getStringCollection(name);
+ tmpfiles.addAll(values);
+ conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+ }
+
+ /**
+ *
+ * @param className to find
+ * @param base base class
+ * @param <T> class type found
+ * @return type found
+ */
+ private <T> Class<? extends T> findClass(String className, Class<T> base) {
+ try {
+ Class<?> cls = Class.forName(className);
+ if (base.isAssignableFrom(cls)) {
+ return cls.asSubclass(base);
+ }
+ return null;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(className +
+ ": Invalid class name");
+ }
+ }
+
+ // TODO use Hive util class if this is already provided by it
+
+ /**
+ * @param outputTablePartitionString table partition string
+ * @return Map
+ */
+ public static Map<String, String> parsePartitionValues(
+ String outputTablePartitionString) {
+ if (outputTablePartitionString != null) {
+ Map<String, String> partitionValues = Maps.newHashMap();
+ for (String partkeyval : outputTablePartitionString.split(" *, *")) {
+ String[] keyval = partkeyval.split(" *= *", 2);
+ if (keyval.length < 2) {
+ throw new IllegalArgumentException(
+ "Unrecognized partition value format: " +
+ outputTablePartitionString);
+ }
+ partitionValues.put(keyval[0], keyval[1]);
+ return partitionValues;
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param outputTablePartitionValues output table partitions
+ * @return String partition values
+ */
+ private static String serializePartitionValues(
+ Map<String, String> outputTablePartitionValues) {
+ StringBuilder outputTablePartitionValuesString = new StringBuilder();
+ for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
+ .entrySet()) {
+ if (outputTablePartitionValuesString.length() != 0) {
+ outputTablePartitionValuesString.append(",");
+ outputTablePartitionValuesString
+ .append(partitionValueEntry.getKey()).append("=")
+ .append(partitionValueEntry.getValue());
+ }
+ }
+ return outputTablePartitionValuesString.toString();
+ }
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Override this method to add more command-line options. You can process
+ * them by also overriding {@link #processMoreArguments(CommandLine)}.
+ *
+ * @param options Options
+ */
+ protected void addMoreOptions(Options options) {
+ }
+
+ /**
+ * Override this method to process additional command-line arguments. You
+ * may want to declare additional options by also overriding
+ * {@link #addMoreOptions(Options)}.
+ *
+ * @param cmd Command
+ */
+ protected void processMoreArguments(CommandLine cmd) {
+ }
+
+ /**
+ * Override this method to do additional setup with the GiraphJob that will
+ * run.
+ *
+ * @param job
+ * GiraphJob that is going to run
+ */
+ protected void initGiraphJob(GiraphJob job) {
+ LOG.info(getClass().getSimpleName() + " with");
+ String prefix = "\t";
+ LOG.info(prefix + "-vertexClass=" +
+ vertexClass.getCanonicalName());
+ LOG.info(prefix + "-vertexInputFormatClass=" +
+ vertexInputFormatClass.getCanonicalName());
+ LOG.info(prefix + "-vertexOutputFormatClass=" +
+ vertexOutputFormatClass.getCanonicalName());
+ LOG.info(prefix + "-inputTable=" + inputTableName);
+ if (inputTableFilterExpr != null) {
+ LOG.info(prefix + "-inputFilter=\"" +
+ inputTableFilterExpr + "\"");
+ }
+ LOG.info(prefix + "-outputTable=" + outputTableName);
+ if (outputTablePartitionValues != null) {
+ LOG.info(prefix + "-outputPartition=\"" +
+ serializePartitionValues(outputTablePartitionValues) +
+ "\"");
+ }
+ LOG.info(prefix + "-workers=" + workers);
+ }
}
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java Fri Oct 5 21:59:40 2012
@@ -16,7 +16,8 @@
* limitations under the License.
*/
/**
- * Package of input and output format classes for loading and storing Hive/Pig data using HCatalog.
+ * Package of input and output format classes
+ * for loading and storing Hive/Pig data using HCatalog.
*/
package org.apache.giraph.format.hcatalog;
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java Fri Oct 5 21:59:40 2012
@@ -141,6 +141,8 @@ public class TestHBaseRootMarkerVertextF
//now operate over HBase using Vertex I/O formats
conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+ HBaseVertexInputFormat.setConf(conf);
+ HBaseVertexOutputFormat.setConf(conf);
GiraphJob giraphJob = new GiraphJob(conf, getCallingMethodName());
GiraphConfiguration giraphConf = giraphJob.getConfiguration();
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java Fri Oct 5 21:59:40 2012
@@ -52,7 +52,7 @@ public class TableEdgeInputFormat extend
TaskAttemptContext context) throws IOException {
return new TableEdgeVertexReader(
- tableInputFormat.createRecordReader(split, context));
+ BASE_FORMAT.createRecordReader(split, context));
}
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java Fri Oct 5 21:59:40 2012
@@ -40,7 +40,7 @@ public class TableEdgeOutputFormat
createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
RecordWriter<ImmutableBytesWritable, Writable> writer =
- tableOutputFormat.getRecordWriter(context);
+ BASE_FORMAT.getRecordWriter(context);
return new TableEdgeVertexWriter(writer);
}