You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/05 23:59:41 UTC

svn commit: r1394835 - in /giraph/trunk: ./ giraph-formats-contrib/ giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/ giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/ giraph-formats-contrib/src/main/java/org/ap...

Author: aching
Date: Fri Oct  5 21:59:40 2012
New Revision: 1394835

URL: http://svn.apache.org/viewvc?rev=1394835&view=rev
Log:
GIRAPH-350: HBaseVertex i/o formats are not being injected with
Configuration via Configurable interface. (bfem via aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/pom.xml
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Oct  5 21:59:40 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-350: HBaseVertex i/o formats are not being injected with
+  Configuration via Configurable interface. (bfem via aching)
+
   GIRAPH-356: Improve ZooKeeper issues. (aching)
 
   GIRAPH-342: Recursive ZooKeeper calls should call progress, dynamic 

Modified: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (original)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Fri Oct  5 21:59:40 2012
@@ -18,7 +18,7 @@ under the License.
 -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
     <modelVersion>4.0.0</modelVersion>
     <groupId>org.apache.giraph</groupId>
@@ -27,6 +27,7 @@ under the License.
     <properties>
         <compileSource>1.6</compileSource>
         <hadoop.version>0.20.203.0</hadoop.version>
+        <forHadoop>for-hadoop-${hadoop.version}</forHadoop>
         <hbase.version>0.90.5</hbase.version>
         <hcatalog.version>0.5.0-SNAPSHOT</hcatalog.version>
         <hive.version>0.9.0</hive.version>
@@ -47,7 +48,7 @@ under the License.
                     <systemProperties>
                         <property>
                             <name>prop.jarLocation</name>
-                            <value>${giraph.trunk.base}/target/giraph-${project.version}-jar-with-dependencies.jar</value>
+                            <value>${giraph.trunk.base}/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
                         </property>
                     </systemProperties>
                 </configuration>
@@ -60,15 +61,34 @@ under the License.
                     <configLocation>../checkstyle.xml</configLocation>
                     <enableRulesSummary>false</enableRulesSummary>
                     <headerLocation>../license-header.txt</headerLocation>
-	                <failOnError>true</failOnError>
-	                <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                    <failOnError>true</failOnError>
+                    <includeTestSourceDirectory>false</includeTestSourceDirectory>
                 </configuration>
                 <executions>
                     <execution>
                         <phase>verify</phase>
                         <goals>
-                             <goal>check</goal>
-                       </goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.7</version>
+                <executions>
+                    <execution>
+                        <id>add-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${basedir}/src/main/java/org/apache/giraph/format/hcatalog</source>
+                            </sources>
+                        </configuration>
                     </execution>
                 </executions>
             </plugin>
@@ -193,9 +213,9 @@ under the License.
             <scope>test</scope>
         </dependency>
         <dependency>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-common</artifactId>
-          <version>${hive.version}</version>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <version>${hive.version}</version>
         </dependency>
     </dependencies>
 </project>

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java Fri Oct  5 21:59:40 2012
@@ -22,8 +22,6 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Value;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,7 +50,7 @@ public abstract class AccumuloVertexInpu
         V extends Writable,
         E extends Writable,
         M extends Writable>
-        extends VertexInputFormat<I, V, E, M> implements Configurable {
+        extends VertexInputFormat<I, V, E, M> {
   /**
    * delegate input format for all accumulo operations.
    */
@@ -60,21 +58,6 @@ public abstract class AccumuloVertexInpu
       new AccumuloInputFormat();
 
   /**
-   * Configured and injected by the job
-  */
-  private Configuration conf;
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
   * Abstract class which provides a template for instantiating vertices
   * from Accumulo Key/Value pairs.
   *
@@ -103,7 +86,15 @@ public abstract class AccumuloVertexInpu
       this.reader = reader;
     }
 
-    @Override
+      /**
+       * initialize the reader.
+       *
+       * @param inputSplit Input split to be used for reading vertices.
+       * @param context Context from the task.
+       * @throws IOException
+       * @throws InterruptedException
+       */
+
     public void initialize(InputSplit inputSplit,
                            TaskAttemptContext context)
       throws IOException, InterruptedException {
@@ -160,6 +151,7 @@ public abstract class AccumuloVertexInpu
    * @throws IOException
    * @throws InterruptedException
    */
+  @Override
   public List<InputSplit> getSplits(
     JobContext context, int numWorkers)
     throws IOException, InterruptedException {

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java Fri Oct  5 21:59:40 2012
@@ -21,8 +21,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -50,7 +48,7 @@ public abstract class AccumuloVertexOutp
         I extends WritableComparable,
         V extends Writable,
         E extends Writable>
-        extends VertexOutputFormat<I, V, E> implements Configurable {
+        extends VertexOutputFormat<I, V, E> {
 
 
   /**
@@ -64,12 +62,6 @@ public abstract class AccumuloVertexOutp
   protected AccumuloOutputFormat accumuloOutputFormat =
           new AccumuloOutputFormat();
 
-
-  /**
-   * Used by configured interface
-   */
-  private Configuration conf;
-
   /**
    *
    * Main abstraction point for vertex writers to persist back
@@ -145,17 +137,6 @@ public abstract class AccumuloVertexOutp
     }
 
   }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
   /**
    *
    * checkOutputSpecs
@@ -164,6 +145,7 @@ public abstract class AccumuloVertexOutp
    * @throws IOException
    * @throws InterruptedException
    */
+  @Override
   public void checkOutputSpecs(JobContext context)
     throws IOException, InterruptedException {
     try {
@@ -185,6 +167,7 @@ public abstract class AccumuloVertexOutp
    * @throws IOException
    * @throws InterruptedException
    */
+  @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
     throws IOException, InterruptedException {
     return accumuloOutputFormat.getOutputCommitter(context);

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java Fri Oct  5 21:59:40 2012
@@ -19,7 +19,6 @@ package org.apache.giraph.format.hbase;
 
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -60,32 +59,22 @@ public abstract  class HBaseVertexInputF
         V extends Writable,
         E extends Writable,
         M extends Writable>
-        extends VertexInputFormat<I, V, E, M> implements Configurable {
+        extends VertexInputFormat<I, V, E, M>  {
 
   /**
    * delegate HBase table input format
    */
-  protected TableInputFormat tableInputFormat =
-         new TableInputFormat();
-  /**
-   * Injected conf by Configurable interface
-   */
-  private Configuration conf;
+  protected static TableInputFormat BASE_FORMAT =
+          new TableInputFormat();
 
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * setConf()
-   *
-   * We must initialize the table format since we manually instantiate it.
-   *
-   * @param conf Configuration object
-   */
-  public void setConf(Configuration conf) {
-    tableInputFormat.setConf(conf);
-    this.conf = conf;
+    /**
+     * static method to initialize
+     * base table input format
+     * with Configuration.
+     * @param conf job configuration
+     */
+  public static void setConf(Configuration conf) {
+    BASE_FORMAT.setConf(conf);
   }
 
   /**
@@ -193,6 +182,6 @@ public abstract  class HBaseVertexInputF
   public List<InputSplit> getSplits(
   JobContext context, int numWorkers)
     throws IOException, InterruptedException {
-    return tableInputFormat.getSplits(context);
+    return BASE_FORMAT.getSplits(context);
   }
 }

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java Fri Oct  5 21:59:40 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.format.hbase;
 
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -56,17 +55,13 @@ public abstract class HBaseVertexOutputF
         V extends Writable,
         E extends Writable>
         extends VertexOutputFormat
-                <I, V, E> implements Configurable {
+                <I, V, E> {
 
   /**
    * delegate output format that writes to HBase
    */
-  protected TableOutputFormat<ImmutableBytesWritable>
-  tableOutputFormat = new TableOutputFormat<ImmutableBytesWritable>();
-  /**
-   * Injected conf by Configurable
-   */
-  private Configuration conf;
+  protected static TableOutputFormat<ImmutableBytesWritable>
+  BASE_FORMAT = new TableOutputFormat<ImmutableBytesWritable>();
 
   /**
    *   Constructor
@@ -154,13 +149,8 @@ public abstract class HBaseVertexOutputF
    *
    * @param conf Injected configuration instance
    */
-  public void setConf(Configuration conf) {
-    tableOutputFormat.setConf(conf);
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return this.conf;
+  public static void setConf(Configuration conf) {
+    BASE_FORMAT.setConf(conf);
   }
 
   /**
@@ -172,7 +162,7 @@ public abstract class HBaseVertexOutputF
    */
   public void checkOutputSpecs(JobContext context)
     throws IOException, InterruptedException {
-    tableOutputFormat.checkOutputSpecs(context);
+    BASE_FORMAT.checkOutputSpecs(context);
   }
 
   /**
@@ -186,6 +176,6 @@ public abstract class HBaseVertexOutputF
   public OutputCommitter getOutputCommitter(
     TaskAttemptContext context)
     throws IOException, InterruptedException {
-    return tableOutputFormat.getOutputCommitter(context);
+    return BASE_FORMAT.getOutputCommitter(context);
   }
 }

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java Fri Oct  5 21:59:40 2012
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
@@ -38,6 +37,7 @@ import org.apache.hcatalog.mapreduce.HCa
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.log4j.Logger;
 
 /**
  * Abstract class that users should subclass to load data from a Hive or Pig
@@ -47,235 +47,338 @@ import com.google.common.collect.Maps;
  * stored in the input table.
  * <p>
  * The desired database and table name to load from can be specified via
- * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.InputJobInfo)}
  * as you setup your vertex input format with
  * {@link GiraphJob#setVertexInputFormatClass(Class)}.
- * 
- * @param <I>
- *            Vertex index value
- * @param <V>
- *            Vertex value
- * @param <E>
- *            Edge value
- * @param <M>
- *            Message value
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-		extends VertexInputFormat<I, V, E, M> {
-
-	protected HCatInputFormat hCatInputFormat = new HCatInputFormat();
-
-	@Override
-	public final List<InputSplit> getSplits(JobContext context, int numWorkers)
-			throws IOException, InterruptedException {
-		return hCatInputFormat.getSplits(context);
-	}
-
-	/**
-	 * Abstract class that users should subclass based on their specific vertex
-	 * input. HCatRecord can be parsed to get the required data for implementing
-	 * getCurrentVertex(). If the vertex spans more than one HCatRecord,
-	 * nextVertex() should be overwritten to handle that logic as well.
-	 * 
-	 * @param <I>
-	 *            Vertex index value
-	 * @param <V>
-	 *            Vertex value
-	 * @param <E>
-	 *            Edge value
-	 * @param <M>
-	 *            Message value
-	 */
-	protected abstract class HCatalogVertexReader implements
-			VertexReader<I, V, E, M> {
-
-		/** Internal HCatRecordReader */
-		private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
-
-		/** Context passed to initialize */
-		private TaskAttemptContext context;
-
-		/**
-		 * Initialize with the HCatRecordReader.
-		 * 
-		 * @param hCatRecordReader
-		 *            Internal reader
-		 */
-		private void initialize(
-				RecordReader<WritableComparable, HCatRecord> hCatRecordReader) {
-			this.hCatRecordReader = hCatRecordReader;
-		}
-
-		@Override
-		public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-				throws IOException, InterruptedException {
-			hCatRecordReader.initialize(inputSplit, context);
-			this.context = context;
-		}
-
-		@Override
-		public boolean nextVertex() throws IOException, InterruptedException {
-			// Users can override this if desired, and a vertex is bigger than
-			// a single row.
-			return hCatRecordReader.nextKeyValue();
-		}
-
-		@Override
-		public void close() throws IOException {
-			hCatRecordReader.close();
-		}
-
-		@Override
-		public float getProgress() throws IOException, InterruptedException {
-			return hCatRecordReader.getProgress();
-		}
-
-		/**
-		 * Get the record reader.
-		 * 
-		 * @return Record reader to be used for reading.
-		 */
-		protected RecordReader<WritableComparable, HCatRecord> getRecordReader() {
-			return hCatRecordReader;
-		}
-
-		/**
-		 * Get the context.
-		 * 
-		 * @return Context passed to initialize.
-		 */
-		protected TaskAttemptContext getContext() {
-			return context;
-		}
-	}
-
-	protected abstract HCatalogVertexReader createVertexReader();
-
-	@Override
-	public final VertexReader<I, V, E, M> createVertexReader(InputSplit split,
-			TaskAttemptContext context) throws IOException {
-		try {
-			HCatalogVertexReader reader = createVertexReader();
-			reader.initialize(hCatInputFormat
-					.createRecordReader(split, context));
-			return reader;
-		} catch (InterruptedException e) {
-			throw new IllegalStateException(
-					"createVertexReader: Interrupted creating reader.", e);
-		}
-	}
-
-	/**
-	 * HCatalogVertexReader for tables holding complete vertex info within each
-	 * row.
-	 */
-	protected abstract class SingleRowHCatalogVertexReader extends
-			HCatalogVertexReader {
-
-		protected abstract I getVertexId(HCatRecord record);
-
-		protected abstract V getVertexValue(HCatRecord record);
-
-		protected abstract Map<I, E> getEdges(HCatRecord record);
-
-		private int recordCount = 0;
-
-		@Override
-		public final Vertex<I, V, E, M> getCurrentVertex()
-				throws IOException, InterruptedException {
-			HCatRecord record = getRecordReader().getCurrentValue();
-			Vertex<I, V, E, M> vertex = BspUtils.createVertex(getContext()
-					.getConfiguration());
-			vertex.initialize(getVertexId(record), getVertexValue(record),
-					getEdges(record), null);
-			++recordCount;
-			if ((recordCount % 1000) == 0) {
-				System.out.println("read " + recordCount + " records");
-				// memory usage
-				Runtime runtime = Runtime.getRuntime();
-				double gb = 1024 * 1024 * 1024;
-				System.out.println("Memory: " + (runtime.totalMemory() / gb)
-						+ "GB total = "
-						+ ((runtime.totalMemory() - runtime.freeMemory()) / gb)
-						+ "GB used + " + (runtime.freeMemory() / gb)
-						+ "GB free, " + (runtime.maxMemory() / gb) + "GB max");
-			}
-			return vertex;
-		}
-
-	}
-
-	/**
-	 * HCatalogVertexReader for tables holding vertex info across multiple rows
-	 * sorted by vertex id column, so that they appear consecutively to the
-	 * RecordReader.
-	 */
-	protected abstract class MultiRowHCatalogVertexReader extends
-			HCatalogVertexReader {
-
-		protected abstract I getVertexId(HCatRecord record);
-
-		protected abstract V getVertexValue(Iterable<HCatRecord> records);
-
-		protected abstract I getTargetVertexId(HCatRecord record);
-
-		protected abstract E getEdgeValue(HCatRecord record);
-
-		private Vertex<I, V, E, M> vertex = null;
-
-		@Override
-		public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
-				InterruptedException {
-			return vertex;
-		}
-
-		private I currentVertexId = null;
-		private Map<I, E> destEdgeMap = Maps.newHashMap();
-		private List<HCatRecord> recordsForVertex = Lists.newArrayList();
-		private int recordCount = 0;
-
-		@Override
-		public final boolean nextVertex() throws IOException,
-				InterruptedException {
-			while (getRecordReader().nextKeyValue()) {
-				HCatRecord record = getRecordReader().getCurrentValue();
-				if (currentVertexId == null) {
-					currentVertexId = getVertexId(record);
-				}
-				if (currentVertexId.equals(getVertexId(record))) {
-					destEdgeMap.put(getTargetVertexId(record),
-							getEdgeValue(record));
-					recordsForVertex.add(record);
-				} else {
-					createCurrentVertex();
-					if ((recordCount % 1000) == 0) {
-						System.out.println("read " + recordCount);
-					}
-					currentVertexId = getVertexId(record);
-					recordsForVertex.add(record);
-					return true;
-				}
-			}
-
-			if (destEdgeMap.isEmpty()) {
-				return false;
-			} else {
-				createCurrentVertex();
-				return true;
-			}
-
-		}
-
-		private void createCurrentVertex() {
-			vertex = BspUtils.createVertex(getContext().getConfiguration());
-			vertex.initialize(currentVertexId,
-					getVertexValue(recordsForVertex), destEdgeMap, null);
-			destEdgeMap.clear();
-			recordsForVertex.clear();
-			++recordCount;
-		}
-
-	}
 
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+  /**
+  * H catalog input format.
+  */
+  private HCatInputFormat hCatInputFormat = new HCatInputFormat();
+
+  @Override
+  public final List<InputSplit> getSplits(
+            final JobContext context, final int numWorkers)
+    throws IOException, InterruptedException {
+    return hCatInputFormat.getSplits(context);
+  }
+
+ /**
+  * Abstract class that users should subclass
+  * based on their specific vertex
+  * input. HCatRecord can be parsed to get the
+  * required data for implementing
+  * getCurrentVertex(). If the vertex spans more
+  * than one HCatRecord,
+  * nextVertex() should be overwritten to handle that logic as well.
+  */
+  protected abstract class HCatalogVertexReader implements
+        VertexReader<I, V, E, M> {
+
+    /** Internal HCatRecordReader. */
+    private RecordReader<WritableComparable,
+                HCatRecord> hCatRecordReader;
+
+    /** Context passed to initialize. */
+    private TaskAttemptContext context;
+
+    /**
+     * Initialize with the HCatRecordReader.
+     *
+     * @param recordReader internal reader
+     */
+    private void initialize(
+       final RecordReader<
+                   WritableComparable, HCatRecord>
+                   recordReader) {
+      this.hCatRecordReader = recordReader;
+    }
+
+    @Override
+    public final void initialize(
+           final InputSplit inputSplit,
+           final TaskAttemptContext ctxt)
+      throws IOException, InterruptedException {
+      hCatRecordReader.initialize(inputSplit, ctxt);
+      this.context = ctxt;
+    }
+
+    @Override
+    public boolean nextVertex()
+      throws IOException, InterruptedException {
+       // Users can override this if desired,
+       // and a vertex is bigger than
+       // a single row.
+      return hCatRecordReader.nextKeyValue();
+    }
+
+    @Override
+    public final void close() throws IOException {
+      hCatRecordReader.close();
+    }
+
+    @Override
+    public final float getProgress()
+      throws IOException, InterruptedException {
+      return hCatRecordReader.getProgress();
+    }
+
+    /**
+    * Get the record reader.
+    * @return Record reader to be used for reading.
+    */
+    protected final RecordReader<WritableComparable, HCatRecord>
+    getRecordReader() {
+      return hCatRecordReader;
+    }
+
+   /**
+    * Get the context.
+    *
+    *
+    *
+    * @return Context passed to initialize.
+    */
+    protected final TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+  * create vertex writer instance.
+  * @return HCatalogVertexReader
+  */
+  protected abstract HCatalogVertexReader createVertexReader();
+
+  @Override
+  public final VertexReader<I, V, E, M>
+  createVertexReader(final InputSplit split,
+    final TaskAttemptContext context)
+    throws IOException {
+    try {
+      HCatalogVertexReader reader = createVertexReader();
+      reader.initialize(hCatInputFormat.
+              createRecordReader(split, context));
+      return reader;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+               "createVertexReader: " +
+                      "Interrupted creating reader.", e);
+    }
+  }
+
+ /**
+  * HCatalogVertexReader for tables holding
+  * complete vertex info within each
+  * row.
+  */
+  protected abstract class SingleRowHCatalogVertexReader
+        extends HCatalogVertexReader {
+
+    /**
+    * 1024 const.
+    */
+    private static final int BYTE_CONST = 1024;
+
+    /**
+    *  logger
+    */
+    private final Logger log =
+            Logger.getLogger(SingleRowHCatalogVertexReader.class);
+    /**
+    * record count.
+    */
+    private int recordCount = 0;
+    /**
+    * modulus check counter.
+    */
+    private final int recordModLimit = 1000;
+
+
+    /**
+    * get vertex id.
+    * @param record hcat record
+    * @return I id
+         */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+    * get vertext value.
+    * @param record hcat record
+    * @return V value
+    */
+    protected abstract V getVertexValue(HCatRecord record);
+
+    /**
+    * get edges.
+    * @param record hcat record
+    * @return Map edges
+    */
+    protected abstract Map<I, E> getEdges(HCatRecord record);
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex =
+                    BspUtils.createVertex(getContext()
+                            .getConfiguration());
+      vertex.initialize(getVertexId(record), getVertexValue(record),
+                    getEdges(record), null);
+      ++recordCount;
+      if ((recordCount % recordModLimit) == 0) {
+        log.info("read " + recordCount + " records");
+                // memory usage
+        Runtime runtime = Runtime.getRuntime();
+        double gb = BYTE_CONST *
+                    BYTE_CONST *
+                    BYTE_CONST;
+        log.info("Memory: " + (runtime.totalMemory() / gb) +
+                "GB total = " +
+                ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
+                "GB used + " + (runtime.freeMemory() / gb) +
+                "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+      }
+      return vertex;
+    }
+  }
+  /**
+  * HCatalogVertexReader for tables
+  * holding vertex info across multiple rows
+  * sorted by vertex id column,
+  * so that they appear consecutively to the
+  * RecordReader.
+  */
+  protected abstract class MultiRowHCatalogVertexReader extends
+            HCatalogVertexReader {
+    /**
+    * modulus check counter.
+    */
+    private static final int RECORD_MOD_LIMIT = 1000;
+
+    /**
+    *  logger
+    */
+    private final Logger log =
+            Logger.getLogger(MultiRowHCatalogVertexReader.class);
+    /**
+    * current vertex id.
+    */
+    private I currentVertexId = null;
+    /**
+    * destination edge map.
+    */
+    private Map<I, E> destEdgeMap = Maps.newHashMap();
+    /**
+    * record for vertex.
+    */
+    private List<HCatRecord> recordsForVertex = Lists.newArrayList();
+    /**
+    * record count.
+    */
+    private int recordCount = 0;
+    /**
+    * vertex.
+    *
+    */
+    private Vertex<I, V, E, M> vertex = null;
+    /**
+    * get vertex id from record.
+    *
+    * @param record hcat
+    * @return I vertex id
+    */
+    protected abstract I getVertexId(HCatRecord record);
+
+    /**
+    * get vertex value from record.
+    * @param records all vertex values
+    * @return V iterable of record values
+    */
+    protected abstract V getVertexValue(
+                Iterable<HCatRecord> records);
+
+    /**
+    * get target vertex id from record.
+    *
+    * @param record hcat
+    * @return I vertex id of target.
+    */
+    protected abstract I getTargetVertexId(HCatRecord record);
+
+    /**
+    * get edge value from record.
+    *
+    * @param record hcat.
+    * @return E edge value.
+    */
+    protected abstract E getEdgeValue(HCatRecord record);
+
+    @Override
+    public final Vertex<I, V, E, M>
+    getCurrentVertex()
+      throws IOException, InterruptedException {
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex()
+      throws IOException, InterruptedException {
+      while (getRecordReader().nextKeyValue()) {
+        HCatRecord record =
+                getRecordReader().getCurrentValue();
+        if (currentVertexId == null) {
+          currentVertexId = getVertexId(record);
+        }
+        if (currentVertexId.equals(getVertexId(record))) {
+          destEdgeMap.put(
+            getTargetVertexId(record),
+            getEdgeValue(record));
+          recordsForVertex.add(record);
+        } else {
+          createCurrentVertex();
+          if ((recordCount % RECORD_MOD_LIMIT) == 0) {
+            log.info("read " + recordCount);
+          }
+          currentVertexId = getVertexId(record);
+          recordsForVertex.add(record);
+          return true;
+        }
+      }
+
+      if (destEdgeMap.isEmpty()) {
+        return false;
+      } else {
+        createCurrentVertex();
+        return true;
+      }
+    }
+
+    /**
+    * create current vertex.
+    */
+    private void createCurrentVertex() {
+      vertex = BspUtils.
+              createVertex(getContext().getConfiguration());
+      vertex.initialize(currentVertexId,
+              getVertexValue(
+                      recordsForVertex), destEdgeMap, null);
+      destEdgeMap.clear();
+      recordsForVertex.clear();
+      ++recordCount;
+    }
+  }
 }

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java Fri Oct  5 21:59:40 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.format.hcatalo
 import java.io.IOException;
 
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.hadoop.io.Writable;
@@ -41,159 +40,175 @@ import org.apache.hcatalog.mapreduce.HCa
  * depending on how you want to fit your vertices into the output table.
  * <p>
  * The desired database and table name to store to can be specified via
- * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job, org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
+ * org.apache.hcatalog.mapreduce.OutputJobInfo)}
  * as you setup your vertex output format with
  * {@link GiraphJob#setVertexOutputFormatClass(Class)}. You must create the
  * output table.
- * 
- * @param <I>
- *            Vertex index value
- * @param <V>
- *            Vertex value
- * @param <E>
- *            Edge value
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
-		extends VertexOutputFormat<I, V, E> {
-
-	protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
-
-	@Override
-	public final void checkOutputSpecs(JobContext context) throws IOException,
-			InterruptedException {
-		hCatOutputFormat.checkOutputSpecs(context);
-	}
-
-	@Override
-	public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
-			throws IOException, InterruptedException {
-		return hCatOutputFormat.getOutputCommitter(context);
-	}
-
-	/**
-	 * Abstract class that users should subclass based on their specific vertex
-	 * output. Users should implement writeVertex to create a HCatRecord that is
-	 * valid to for writing by HCatalogRecordWriter.
-	 * 
-	 * @param <I>
-	 *            Vertex index value
-	 * @param <V>
-	 *            Vertex value
-	 * @param <E>
-	 *            Edge value
-	 */
-	protected abstract class HCatalogVertexWriter implements
-			VertexWriter<I, V, E> {
-
-		/** Internal HCatRecordWriter */
-		private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
-		/** Context passed to initialize */
-		private TaskAttemptContext context;
-
-		/**
-		 * Initialize with the HCatRecordWriter
-		 * 
-		 * @param hCatRecordWriter
-		 *            Internal writer
-		 */
-		private void initialize(
-				RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter) {
-			this.hCatRecordWriter = hCatRecordWriter;
-		}
-
-		/**
-		 * Get the record reader.
-		 * 
-		 * @return Record reader to be used for reading.
-		 */
-		protected RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter() {
-			return hCatRecordWriter;
-		}
-
-		/**
-		 * Get the context.
-		 * 
-		 * @return Context passed to initialize.
-		 */
-		protected TaskAttemptContext getContext() {
-			return context;
-		}
-
-		@Override
-		public void initialize(TaskAttemptContext context) throws IOException {
-			this.context = context;
-		}
-
-		@Override
-		public void close(TaskAttemptContext context) throws IOException,
-				InterruptedException {
-			hCatRecordWriter.close(context);
-		}
-
-	}
-
-	protected abstract HCatalogVertexWriter createVertexWriter();
-
-	@Override
-	public final VertexWriter<I, V, E> createVertexWriter(
-			TaskAttemptContext context) throws IOException,
-			InterruptedException {
-		HCatalogVertexWriter writer = createVertexWriter();
-		writer.initialize(hCatOutputFormat.getRecordWriter(context));
-		return writer;
-	}
-
-	/**
-	 * HCatalogVertexWriter to write each vertex in each row.
-	 */
-	protected abstract class SingleRowHCatalogVertexWriter extends
-			HCatalogVertexWriter {
-
-		protected abstract int getNumColumns();
-
-		protected abstract void fillRecord(HCatRecord record,
-				Vertex<I, V, E, ?> vertex);
-
-		protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
-			HCatRecord record = new DefaultHCatRecord(getNumColumns());
-			fillRecord(record, vertex);
-			return record;
-		}
-
-		@Override
-		// XXX It is important not to put generic type signature <I,V,E,?> after
-		// Vertex. Otherwise, any class that extends this will not compile
-		// because of not implementing the VertexWriter#writeVertex. Mystery of
-		// Java Generics :(
-		@SuppressWarnings("unchecked")
-		public final void writeVertex(Vertex vertex) throws IOException,
-				InterruptedException {
-			getRecordWriter().write(null, createRecord(vertex));
-		}
-
-	}
-
-	/**
-	 * HCatalogVertexWriter to write each vertex in multiple rows.
-	 */
-	public abstract class MultiRowHCatalogVertexWriter extends
-			HCatalogVertexWriter {
-
-		protected abstract Iterable<HCatRecord> createRecords(
-				Vertex<I, V, E, ?> vertex);
-
-		@Override
-		// XXX Same thing here. No Generics for Vertex here.
-		@SuppressWarnings("unchecked")
-		public final void writeVertex(Vertex vertex) throws IOException,
-				InterruptedException {
-			Iterable<HCatRecord> records = createRecords(vertex);
-			for (HCatRecord record : records) {
-				getRecordWriter().write(null, record);
-			}
-		}
-
-	}
-
+public abstract class HCatalogVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+  /**
+  * hcat output format
+  */
+  protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+  @Override
+  public final void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    hCatOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return hCatOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+  * Abstract class that users should
+  * subclass based on their specific vertex
+  * output. Users should implement
+  * writeVertex to create a HCatRecord that is
+  * valid to for writing by HCatalogRecordWriter.
+  */
+  protected abstract class HCatalogVertexWriter implements
+            VertexWriter<I, V, E> {
+
+    /** Internal HCatRecordWriter */
+    private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    /**
+    * Initialize with the HCatRecordWriter
+    * @param hCatRecordWriter
+    *            Internal writer
+    */
+    private void initialize(
+                    RecordWriter<WritableComparable<?>,
+                    HCatRecord> hCatRecordWriter) {
+      this.hCatRecordWriter = hCatRecordWriter;
+    }
+
+    /**
+    * Get the record reader.
+    * @return Record reader to be used for reading.
+    */
+    protected RecordWriter<WritableComparable<?>,
+            HCatRecord> getRecordWriter() {
+      return hCatRecordWriter;
+    }
+
+    /**
+    * Get the context.
+    *
+    * @return Context passed to initialize.
+    */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      hCatRecordWriter.close(context);
+    }
+
+  }
+
+  /**
+  * create vertex writer.
+  * @return HCatalogVertexWriter
+  */
+  protected abstract HCatalogVertexWriter createVertexWriter();
+
+  @Override
+  public final VertexWriter<I, V, E> createVertexWriter(
+    TaskAttemptContext context) throws IOException,
+    InterruptedException {
+    HCatalogVertexWriter writer = createVertexWriter();
+    writer.initialize(hCatOutputFormat.getRecordWriter(context));
+    return writer;
+  }
+
+  /**
+  * HCatalogVertexWriter to write each vertex in each row.
+  */
+  protected abstract class SingleRowHCatalogVertexWriter extends
+            HCatalogVertexWriter {
+    /**
+    * get num columns
+    * @return intcolumns
+    */
+    protected abstract int getNumColumns();
+
+    /**
+    * fill record
+    * @param record to fill
+    * @param vertex to populate record
+    */
+    protected abstract void fillRecord(HCatRecord record,
+                                    Vertex<I, V, E, ?> vertex);
+
+    /**
+    * create record
+    * @param vertex to populate record
+    * @return HCatRecord newly created
+    */
+    protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+      HCatRecord record = new DefaultHCatRecord(getNumColumns());
+      fillRecord(record, vertex);
+      return record;
+    }
+
+    @Override
+    // XXX It is important not to put generic type signature <I,V,E,?> after
+    // Vertex. Otherwise, any class that extends this will not compile
+    // because of not implementing the VertexWriter#writeVertex. Mystery of
+    // Java Generics :(
+    @SuppressWarnings("unchecked")
+    public final void writeVertex(Vertex vertex) throws IOException,
+        InterruptedException {
+      getRecordWriter().write(null, createRecord(vertex));
+    }
+
+  }
+
+  /**
+  * HCatalogVertexWriter to write each vertex in multiple rows.
+  */
+  public abstract class MultiRowHCatalogVertexWriter extends
+    HCatalogVertexWriter {
+    /**
+    * create records
+    * @param vertex to populate records
+    * @return Iterable of records
+    */
+    protected abstract Iterable<HCatRecord> createRecords(
+        Vertex<I, V, E, ?> vertex);
+
+    @Override
+    // XXX Same thing here. No Generics for Vertex here.
+    @SuppressWarnings("unchecked")
+    public final void writeVertex(Vertex vertex) throws IOException,
+        InterruptedException {
+      Iterable<HCatRecord> records = createRecords(vertex);
+      for (HCatRecord record : records) {
+        getRecordWriter().write(null, record);
+      }
+    }
+  }
 }

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java Fri Oct  5 21:59:40 2012
@@ -31,7 +31,6 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
@@ -45,317 +44,425 @@ import org.apache.hcatalog.mapreduce.Out
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.log4j.Logger;
 
+/**
+ * Hive Giraph Runner
+ *
+ */
 public class HiveGiraphRunner implements Tool {
-
-	@SuppressWarnings("rawtypes")
-	private Class<? extends Vertex> vertexClass;
-	@SuppressWarnings("rawtypes")
-	private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
-	@SuppressWarnings("rawtypes")
-	private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
-
-	protected HiveGiraphRunner(
-			@SuppressWarnings("rawtypes") Class<? extends Vertex> vertexClass,
-			@SuppressWarnings("rawtypes") Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
-			@SuppressWarnings("rawtypes") Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
-		this.vertexClass = vertexClass;
-		this.vertexInputFormatClass = vertexInputFormatClass;
-		this.vertexOutputFormatClass = vertexOutputFormatClass;
-		this.conf = new HiveConf(getClass());
-	}
-
-	protected String dbName;
-	protected String inputTableName;
-	protected String inputTableFilterExpr;
-	protected String outputTableName;
-	protected Map<String, String> outputTablePartitionValues;
-
-	protected int workers;
-	protected boolean isVerbose;
-
-	public static void main(String[] args) throws Exception {
-		System.exit(ToolRunner
-				.run(new HiveGiraphRunner(null, null, null), args));
-	}
-
-	@Override
-	public final int run(String[] args) throws Exception {
-		// process args
-		try {
-			processArguments(args);
-		} catch (InterruptedException e) {
-			return 0;
-		} catch (IllegalArgumentException e) {
-			System.err.println(e.getMessage());
-			return -1;
-		}
-
-		// additional configuration for Hive
-		adjustConfigurationForHive(getConf());
-
-		// setup GiraphJob
-		GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-		GiraphConfiguration conf = job.getConfiguration();
-		conf.setVertexClass(vertexClass);
-
-		// setup input from Hive
-		InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
-				inputTableFilterExpr);
-		HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
-		conf.setVertexInputFormatClass(vertexInputFormatClass);
-
-		// setup output to Hive
-		HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
-				dbName, outputTableName, outputTablePartitionValues));
-		HCatOutputFormat.setSchema(job.getInternalJob(),
-				HCatOutputFormat.getTableSchema(job.getInternalJob()));
-		conf.setVertexOutputFormatClass(vertexOutputFormatClass);
-
-		conf.setWorkerConfiguration(workers, workers, 100.0f);
-		initGiraphJob(job);
-
-		return job.run(isVerbose) ? 0 : -1;
-	}
-
-	private static void adjustConfigurationForHive(Configuration conf) {
-		// when output partitions are used, workers register them to the
-		// metastore at cleanup stage, and on HiveConf's initialization, it
-		// looks for hive-site.xml from.
-		addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
-				.getResource("hive-site.xml").toString());
-
-		// Also, you need hive.aux.jars as well
-		// addToStringCollection(conf, "tmpjars",
-		// conf.getStringCollection("hive.aux.jars.path"));
-
-		// Or, more effectively, we can provide all the jars client needed to
-		// the workers as well
-		String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
-				File.pathSeparator);
-		List<String> hadoopJarURLs = Lists.newArrayList();
-		for (String jarPath : hadoopJars) {
-			File file = new File(jarPath);
-			if (file.exists() && file.isFile()) {
-				String jarURL = file.toURI().toString();
-				hadoopJarURLs.add(jarURL);
-			}
-		}
-		addToStringCollection(conf, "tmpjars", hadoopJarURLs);
-	}
-
-	private CommandLine processArguments(String[] args) throws ParseException,
-			InterruptedException {
-		Options options = new Options();
-		options.addOption("h", "help", false, "Help");
-		options.addOption("v", "verbose", false, "Verbose");
-		options.addOption("D", "hiveconf", true,
-				"property=value for Hive/Hadoop configuration");
-		options.addOption("w", "workers", true, "Number of workers");
-		if (vertexClass == null)
-			options.addOption(null, "vertexClass", true,
-					"Giraph Vertex class to use");
-		if (vertexInputFormatClass == null)
-			options.addOption(null, "vertexInputFormatClass", true,
-					"Giraph HCatalogVertexInputFormat class to use");
-		if (vertexOutputFormatClass == null)
-			options.addOption(null, "vertexOutputFormatClass", true,
-					"Giraph HCatalogVertexOutputFormat class to use");
-		options.addOption("db", "database", true, "Hive database name");
-		options.addOption("i", "inputTable", true, "Input table name");
-		options.addOption("I", "inputFilter", true,
-				"Input table filter expression (e.g., \"a<2 AND b='two'\"");
-		options.addOption("o", "outputTable", true, "Output table name");
-		options.addOption("O", "outputPartition", true,
-				"Output table partition values (e.g., \"a=1,b=two\")");
-		addMoreOptions(options);
-
-		CommandLineParser parser = new GnuParser();
-		final CommandLine cmdln = parser.parse(options, args);
-		// for (Option opt : cmd.getOptions()) {
-		// System.out.println(" opt -" + opt.getOpt() + " " + opt.getValue());
-		// }
-		if (args.length == 0 || cmdln.hasOption("help")) {
-			new HelpFormatter().printHelp(getClass().getName(), options, true);
-			throw new InterruptedException();
-		}
-
-		// Giraph classes
-		if (cmdln.hasOption("vertexClass"))
-			vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
-					Vertex.class);
-		if (cmdln.hasOption("vertexInputFormatClass"))
-			vertexInputFormatClass = findClass(
-					cmdln.getOptionValue("vertexInputFormatClass"),
-					HCatalogVertexInputFormat.class);
-		if (cmdln.hasOption("vertexOutputFormatClass"))
-			vertexOutputFormatClass = findClass(
-					cmdln.getOptionValue("vertexOutputFormatClass"),
-					HCatalogVertexOutputFormat.class);
-
-		if (vertexClass == null)
-			throw new IllegalArgumentException(
-					"Need the Giraph Vertex class name (-vertexClass) to use");
-		if (vertexInputFormatClass == null)
-			throw new IllegalArgumentException(
-					"Need the Giraph VertexInputFormat class name (-vertexInputFormatClass) to use");
-		if (vertexOutputFormatClass == null)
-			throw new IllegalArgumentException(
-					"Need the Giraph VertexOutputFormat class name (-vertexOutputFormatClass) to use");
-
-		if (!cmdln.hasOption("workers"))
-			throw new IllegalArgumentException(
-					"Need to choose the number of workers (-w)");
-		if (!cmdln.hasOption("inputTable"))
-			throw new IllegalArgumentException(
-					"Need to set the input table name (-i).  One example is 'dim_friendlist'");
-		if (!cmdln.hasOption("outputTable"))
-			throw new IllegalArgumentException(
-					"Need to set the output table name (-o).");
-
-		dbName = cmdln.getOptionValue("dbName", "default");
-		inputTableName = cmdln.getOptionValue("inputTable");
-		inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
-		outputTableName = cmdln.getOptionValue("outputTable");
-		outputTablePartitionValues = parsePartitionValues(cmdln
-				.getOptionValue("outputPartition"));
-		workers = Integer.parseInt(cmdln.getOptionValue("workers"));
-		isVerbose = cmdln.hasOption("verbose");
-
-		// pick up -hiveconf arguments
-		Configuration conf = getConf();
-		for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
-			String[] keyval = hiveconf.split("=", 2);
-			if (keyval.length == 2) {
-				String name = keyval[0];
-				String value = keyval[1];
-				if (name.equals("tmpjars") || name.equals("tmpfiles"))
-					addToStringCollection(conf, name, value);
-				else
-					conf.set(name, value);
-			}
-		}
-
-		processMoreArguments(cmdln);
-
-		return cmdln;
-	}
-
-	private static void addToStringCollection(Configuration conf, String name,
-			String... values) {
-		addToStringCollection(conf, name, Arrays.asList(values));
-	}
-
-	private static void addToStringCollection(Configuration conf, String name,
-			Collection<? extends String> values) {
-		Collection<String> tmpfiles = conf.getStringCollection(name);
-		tmpfiles.addAll(values);
-		conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
-		// System.out.println(name + "=" + conf.get(name));
-	}
-
-	private <T> Class<? extends T> findClass(String className, Class<T> base) {
-		try {
-			Class<?> cls = Class.forName(className);
-			if (base.isAssignableFrom(cls))
-				return cls.asSubclass(base);
-			return null;
-		} catch (ClassNotFoundException e) {
-			throw new IllegalArgumentException(className
-					+ ": Invalid class name");
-		}
-	}
-
-	// TODO use Hive util class if this is already provided by it
-	public static Map<String, String> parsePartitionValues(
-			String outputTablePartitionString) {
-		if (outputTablePartitionString != null) {
-			Map<String, String> partitionValues = Maps.newHashMap();
-			for (String partkeyval : outputTablePartitionString.split(" *, *")) {
-				String[] keyval = partkeyval.split(" *= *", 2);
-				if (keyval.length < 2)
-					throw new IllegalArgumentException(
-							"Unrecognized partition value format: "
-									+ outputTablePartitionString);
-				partitionValues.put(keyval[0], keyval[1]);
-			}
-			return partitionValues;
-		} else
-			return null;
-	}
-
-	private static String serializePartitionValues(
-			Map<String, String> outputTablePartitionValues) {
-		StringBuilder outputTablePartitionValuesString = new StringBuilder();
-		for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
-				.entrySet()) {
-			if (outputTablePartitionValuesString.length() != 0)
-				outputTablePartitionValuesString.append(",");
-			outputTablePartitionValuesString
-					.append(partitionValueEntry.getKey()).append("=")
-					.append(partitionValueEntry.getValue());
-		}
-		return outputTablePartitionValuesString.toString();
-	}
-
-	/** Configuration */
-	private Configuration conf;
-
-	@Override
-	public final Configuration getConf() {
-		return conf;
-	}
-
-	@Override
-	public final void setConf(Configuration conf) {
-		this.conf = conf;
-	}
-
-	/**
-	 * Override this method to add more command-line options. You can process
-	 * them by also overriding {@link #processMoreArguments(CommandLine)}.
-	 *
-	 * @param options
-	 */
-	protected void addMoreOptions(Options options) {
-	}
-
-	/**
-	 * Override this method to process additional command-line arguments. You
-	 * may want to declare additional options by also overriding
-	 * {@link #addMoreOptions(Options)}.
-	 *
-	 * @param cmd
-	 */
-	protected void processMoreArguments(CommandLine cmd) {
-	}
-
-	/**
-	 * Override this method to do additional setup with the GiraphJob that will
-	 * run.
-	 *
-	 * @param job
-	 *            GiraphJob that is going to run
-	 */
-	protected void initGiraphJob(GiraphJob job) {
-		System.out.println(getClass().getSimpleName() + " with");
-		String prefix = "\t";
-		System.out.println(prefix + "-vertexClass="
-				+ vertexClass.getCanonicalName());
-		System.out.println(prefix + "-vertexInputFormatClass="
-				+ vertexInputFormatClass.getCanonicalName());
-		System.out.println(prefix + "-vertexOutputFormatClass="
-				+ vertexOutputFormatClass.getCanonicalName());
-		System.out.println(prefix + "-inputTable=" + inputTableName);
-		if (inputTableFilterExpr != null)
-			System.out.println(prefix + "-inputFilter=\""
-					+ inputTableFilterExpr + "\"");
-		System.out.println(prefix + "-outputTable=" + outputTableName);
-		if (outputTablePartitionValues != null)
-			System.out.println(prefix + "-outputPartition=\""
-					+ serializePartitionValues(outputTablePartitionValues)
-					+ "\"");
-		System.out.println(prefix + "-workers=" + workers);
-	}
+  /**
+  * logger
+  */
+  private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
+  /**
+  * workers
+  */
+  protected int workers;
+  /**
+  * is verbose
+  */
+  protected boolean isVerbose;
+  /**
+  * output table partitions
+  */
+  protected Map<String, String> outputTablePartitionValues;
+  /**
+  * dbName
+  */
+  protected String dbName;
+  /**
+  * input table name
+  */
+  protected String inputTableName;
+  /**
+  * input table filter
+  */
+  protected String inputTableFilterExpr;
+  /**
+  * output table name
+  */
+  protected String outputTableName;
+
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+  * vertex class.
+  */
+  @SuppressWarnings("rawtypes")
+  private Class<? extends Vertex> vertexClass;
+  /**
+  * vertex input format internal.
+  */
+  @SuppressWarnings("rawtypes")
+  private Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass;
+  /**
+  * vertex output format internal.
+  */
+  @SuppressWarnings("rawtypes")
+  private Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass;
+
+  /**
+  * giraph runner class.
+  * @param vertexClass vertec class
+  * @param vertexInputFormatClass input format
+  * @param vertexOutputFormatClass output format
+  */
+  protected HiveGiraphRunner(
+          @SuppressWarnings("rawtypes")
+          Class<? extends Vertex> vertexClass,
+          @SuppressWarnings("rawtypes")
+          Class<? extends HCatalogVertexInputFormat> vertexInputFormatClass,
+          @SuppressWarnings("rawtypes")
+          Class<? extends HCatalogVertexOutputFormat> vertexOutputFormatClass) {
+    this.vertexClass = vertexClass;
+    this.vertexInputFormatClass = vertexInputFormatClass;
+    this.vertexOutputFormatClass = vertexOutputFormatClass;
+    this.conf = new HiveConf(getClass());
+  }
+
+  /**
+  * main method
+  * @param args system arguments
+  * @throws Exception any errors from Hive Giraph Runner
+  */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner
+                .run(new HiveGiraphRunner(null, null, null), args));
+  }
+
+  @Override
+  public final int run(String[] args) throws Exception {
+        // process args
+    try {
+      processArguments(args);
+    } catch (InterruptedException e) {
+      return 0;
+    } catch (IllegalArgumentException e) {
+      System.err.println(e.getMessage());
+      return -1;
+    }
+
+    // additional configuration for Hive
+    adjustConfigurationForHive(getConf());
+
+    // setup GiraphJob
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.getConfiguration().setVertexClass(vertexClass);
+
+    // setup input from Hive
+    InputJobInfo inputJobInfo = InputJobInfo.create(dbName, inputTableName,
+                inputTableFilterExpr);
+    HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
+    job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
+
+    // setup output to Hive
+    HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
+                dbName, outputTableName, outputTablePartitionValues));
+    HCatOutputFormat.setSchema(job.getInternalJob(),
+                HCatOutputFormat.getTableSchema(job.getInternalJob()));
+    job.getConfiguration().setVertexOutputFormatClass(vertexOutputFormatClass);
+
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+    initGiraphJob(job);
+
+    return job.run(isVerbose) ? 0 : -1;
+  }
+
+  /**
+  * set hive configuration
+  * @param conf Configuration argument
+  */
+  private static void adjustConfigurationForHive(Configuration conf) {
+        // when output partitions are used, workers register them to the
+        // metastore at cleanup stage, and on HiveConf's initialization, it
+        // looks for hive-site.xml from.
+    addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+                .getResource("hive-site.xml").toString());
+
+        // Also, you need hive.aux.jars as well
+        // addToStringCollection(conf, "tmpjars",
+        // conf.getStringCollection("hive.aux.jars.path"));
+
+        // Or, more effectively, we can provide all the jars client needed to
+        // the workers as well
+    String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+                File.pathSeparator);
+    List<String> hadoopJarURLs = Lists.newArrayList();
+    for (String jarPath : hadoopJars) {
+      File file = new File(jarPath);
+      if (file.exists() && file.isFile()) {
+        String jarURL = file.toURI().toString();
+        hadoopJarURLs.add(jarURL);
+      }
+    }
+    addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+  }
+
+  /**
+  * process arguments
+  * @param args to process
+  * @return CommandLine instance
+  * @throws ParseException error parsing arguments
+  * @throws InterruptedException interrupted
+  */
+  private CommandLine processArguments(String[] args) throws ParseException,
+            InterruptedException {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("D", "hiveconf", true,
+                "property=value for Hive/Hadoop configuration");
+    options.addOption("w", "workers", true, "Number of workers");
+    if (vertexClass == null) {
+      options.addOption(null, "vertexClass", true,
+                    "Giraph Vertex class to use");
+    }
+    if (vertexInputFormatClass == null) {
+      options.addOption(null, "vertexInputFormatClass", true,
+                    "Giraph HCatalogVertexInputFormat class to use");
+    }
+
+    if (vertexOutputFormatClass == null) {
+      options.addOption(null, "vertexOutputFormatClass", true,
+                    "Giraph HCatalogVertexOutputFormat class to use");
+    }
+
+    options.addOption("db", "database", true, "Hive database name");
+    options.addOption("i", "inputTable", true, "Input table name");
+    options.addOption("I", "inputFilter", true,
+                "Input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("o", "outputTable", true, "Output table name");
+    options.addOption("O", "outputPartition", true,
+                "Output table partition values (e.g., \"a=1,b=two\")");
+    addMoreOptions(options);
+
+    CommandLineParser parser = new GnuParser();
+    final CommandLine cmdln = parser.parse(options, args);
+    if (args.length == 0 || cmdln.hasOption("help")) {
+      new HelpFormatter().printHelp(getClass().getName(), options, true);
+      throw new InterruptedException();
+    }
+
+    // Giraph classes
+    if (cmdln.hasOption("vertexClass")) {
+      vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
+              Vertex.class);
+    }
+    if (cmdln.hasOption("vertexInputFormatClass")) {
+      vertexInputFormatClass = findClass(
+              cmdln.getOptionValue("vertexInputFormatClass"),
+              HCatalogVertexInputFormat.class);
+    }
+    if (cmdln.hasOption("vertexOutputFormatClass")) {
+      vertexOutputFormatClass = findClass(
+              cmdln.getOptionValue("vertexOutputFormatClass"),
+              HCatalogVertexOutputFormat.class);
+    }
+
+    if (vertexClass == null) {
+      throw new IllegalArgumentException(
+                "Need the Giraph Vertex class name (-vertexClass) to use");
+    }
+    if (vertexInputFormatClass == null) {
+      throw new IllegalArgumentException(
+                    "Need the Giraph VertexInputFormat " +
+                            "class name (-vertexInputFormatClass) to use");
+    }
+    if (vertexOutputFormatClass == null) {
+      throw new IllegalArgumentException(
+                    "Need the Giraph VertexOutputFormat " +
+                            "class name (-vertexOutputFormatClass) to use");
+    }
+    if (!cmdln.hasOption("workers")) {
+      throw new IllegalArgumentException(
+                    "Need to choose the number of workers (-w)");
+    }
+    if (!cmdln.hasOption("inputTable")) {
+      throw new IllegalArgumentException(
+                    "Need to set the input table name (-i).  " +
+                            "One example is 'dim_friendlist'");
+    }
+    if (!cmdln.hasOption("outputTable")) {
+      throw new IllegalArgumentException(
+                    "Need to set the output table name (-o).");
+    }
+    dbName = cmdln.getOptionValue("dbName", "default");
+    inputTableName = cmdln.getOptionValue("inputTable");
+    inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
+    outputTableName = cmdln.getOptionValue("outputTable");
+    outputTablePartitionValues = parsePartitionValues(cmdln
+                .getOptionValue("outputPartition"));
+    workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+    isVerbose = cmdln.hasOption("verbose");
+
+    // pick up -hiveconf arguments
+    for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+      String[] keyval = hiveconf.split("=", 2);
+      if (keyval.length == 2) {
+        String name = keyval[0];
+        String value = keyval[1];
+        if (name.equals("tmpjars") || name.equals("tmpfiles")) {
+          addToStringCollection(
+                  conf, name, value);
+        } else {
+          conf.set(name, value);
+        }
+      }
+    }
+
+    processMoreArguments(cmdln);
+
+    return cmdln;
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(Configuration conf, String name,
+                                              String... values) {
+    addToStringCollection(conf, name, Arrays.asList(values));
+  }
+
+  /**
+  * add string to collection
+  * @param conf Configuration
+  * @param name to add
+  * @param values values for collection
+  */
+  private static void addToStringCollection(
+          Configuration conf, String name, Collection
+          <? extends String> values) {
+    Collection<String> tmpfiles = conf.getStringCollection(name);
+    tmpfiles.addAll(values);
+    conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
+  }
+
+  /**
+  *
+  * @param className to find
+  * @param base  base class
+  * @param <T> class type found
+  * @return type found
+  */
+  private <T> Class<? extends T> findClass(String className, Class<T> base) {
+    try {
+      Class<?> cls = Class.forName(className);
+      if (base.isAssignableFrom(cls)) {
+        return cls.asSubclass(base);
+      }
+      return null;
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(className +
+              ": Invalid class name");
+    }
+  }
+
+  // TODO use Hive util class if this is already provided by it
+
+  /**
+  * @param outputTablePartitionString table partition string
+  * @return Map
+  */
+  public static Map<String, String> parsePartitionValues(
+            String outputTablePartitionString) {
+    if (outputTablePartitionString != null) {
+      Map<String, String> partitionValues = Maps.newHashMap();
+      for (String partkeyval : outputTablePartitionString.split(" *, *")) {
+        String[] keyval = partkeyval.split(" *= *", 2);
+        if (keyval.length < 2) {
+          throw new IllegalArgumentException(
+              "Unrecognized partition value format: " +
+               outputTablePartitionString);
+        }
+        partitionValues.put(keyval[0], keyval[1]);
+        return partitionValues;
+      }
+    }
+    return null;
+  }
+
+  /**
+  *
+  * @param outputTablePartitionValues  output table partitions
+  * @return  String partition values
+  */
+  private static String serializePartitionValues(
+            Map<String, String> outputTablePartitionValues) {
+    StringBuilder outputTablePartitionValuesString = new StringBuilder();
+    for (Entry<String, String> partitionValueEntry : outputTablePartitionValues
+                .entrySet()) {
+      if (outputTablePartitionValuesString.length() != 0) {
+        outputTablePartitionValuesString.append(",");
+        outputTablePartitionValuesString
+                    .append(partitionValueEntry.getKey()).append("=")
+                    .append(partitionValueEntry.getValue());
+      }
+    }
+    return outputTablePartitionValuesString.toString();
+  }
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+  * Override this method to add more command-line options. You can process
+  * them by also overriding {@link #processMoreArguments(CommandLine)}.
+  *
+  * @param options Options
+  */
+  protected void addMoreOptions(Options options) {
+  }
+
+  /**
+  * Override this method to process additional command-line arguments. You
+  * may want to declare additional options by also overriding
+  * {@link #addMoreOptions(Options)}.
+  *
+  * @param cmd Command
+  */
+  protected void processMoreArguments(CommandLine cmd) {
+  }
+
+  /**
+  * Override this method to do additional setup with the GiraphJob that will
+  * run.
+  *
+  * @param job
+  *            GiraphJob that is going to run
+  */
+  protected void initGiraphJob(GiraphJob job) {
+    LOG.info(getClass().getSimpleName() + " with");
+    String prefix = "\t";
+    LOG.info(prefix + "-vertexClass=" +
+         vertexClass.getCanonicalName());
+    LOG.info(prefix + "-vertexInputFormatClass=" +
+        vertexInputFormatClass.getCanonicalName());
+    LOG.info(prefix + "-vertexOutputFormatClass=" +
+        vertexOutputFormatClass.getCanonicalName());
+    LOG.info(prefix + "-inputTable=" + inputTableName);
+    if (inputTableFilterExpr != null) {
+      LOG.info(prefix + "-inputFilter=\"" +
+        inputTableFilterExpr + "\"");
+    }
+    LOG.info(prefix + "-outputTable=" + outputTableName);
+    if (outputTablePartitionValues != null) {
+      LOG.info(prefix + "-outputPartition=\""  +
+        serializePartitionValues(outputTablePartitionValues) +
+        "\"");
+    }
+    LOG.info(prefix + "-workers=" + workers);
+  }
 
 }

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java Fri Oct  5 21:59:40 2012
@@ -16,7 +16,8 @@
  * limitations under the License.
  */
 /**
- * Package of input and output format classes for loading and storing Hive/Pig data using HCatalog.
+ * Package of input and output format classes
+ * for loading and storing Hive/Pig data using HCatalog.
  */
 package org.apache.giraph.format.hcatalog;
 

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java Fri Oct  5 21:59:40 2012
@@ -141,6 +141,8 @@ public class TestHBaseRootMarkerVertextF
             //now operate over HBase using Vertex I/O formats
             conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
             conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+            HBaseVertexInputFormat.setConf(conf);
+            HBaseVertexOutputFormat.setConf(conf);
 
             GiraphJob giraphJob = new GiraphJob(conf, getCallingMethodName());
             GiraphConfiguration giraphConf = giraphJob.getConfiguration();

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java Fri Oct  5 21:59:40 2012
@@ -52,7 +52,7 @@ public class TableEdgeInputFormat extend
                                TaskAttemptContext context) throws IOException {
 
         return new TableEdgeVertexReader(
-                tableInputFormat.createRecordReader(split, context));
+                BASE_FORMAT.createRecordReader(split, context));
 
     }
 

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java?rev=1394835&r1=1394834&r2=1394835&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java Fri Oct  5 21:59:40 2012
@@ -40,7 +40,7 @@ public class TableEdgeOutputFormat
     createVertexWriter(TaskAttemptContext context)
             throws IOException, InterruptedException {
         RecordWriter<ImmutableBytesWritable, Writable> writer =
-                tableOutputFormat.getRecordWriter(context);
+                BASE_FORMAT.getRecordWriter(context);
         return new TableEdgeVertexWriter(writer);
     }