You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/05/29 23:56:32 UTC

svn commit: r1344020 - in /giraph/trunk: ./ giraph-formats-contrib/ giraph-formats-contrib/src/ giraph-formats-contrib/src/main/ giraph-formats-contrib/src/main/assembly/ giraph-formats-contrib/src/main/java/ giraph-formats-contrib/src/main/java/org/ g...

Author: aching
Date: Tue May 29 21:56:31 2012
New Revision: 1344020

URL: http://svn.apache.org/viewvc?rev=1344020&view=rev
Log:
GIRAPH-153: HBase/Accumulo Input and Output formats. (bfem via aching)

Added:
    giraph/trunk/giraph-formats-contrib/
    giraph/trunk/giraph-formats-contrib/pom.xml
    giraph/trunk/giraph-formats-contrib/src/
    giraph/trunk/giraph-formats-contrib/src/main/
    giraph/trunk/giraph-formats-contrib/src/main/assembly/
    giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml
    giraph/trunk/giraph-formats-contrib/src/main/java/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java
    giraph/trunk/giraph-formats-contrib/src/test/
    giraph/trunk/giraph-formats-contrib/src/test/java/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
Modified:
    giraph/trunk/CHANGELOG

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1344020&r1=1344019&r2=1344020&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue May 29 21:56:31 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-153: HBase/Accumulo Input and Output formats. (bfem via aching)
+
   GIRAPH-187: SequenceFileVertexInputFormat has WritableComparable<I>
   as a bounded type for I. (roman4asf via aching)
 

Added: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (added)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Tue May 29 21:56:31 2012
@@ -0,0 +1,148 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>giraph</groupId>
+    <version>0.2-SNAPSHOT</version>
+    <artifactId>giraph-formats-contrib</artifactId>
+    <properties>
+        <compileSource>1.6</compileSource>
+        <hadoop.version>0.20.203.0</hadoop.version>
+        <hbase.version>0.90.5</hbase.version>
+        <accumulo.version>1.4.0</accumulo.version>
+        <maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
+        <maven-javadoc-plugin.version>2.6</maven-javadoc-plugin.version>
+        <giraph.trunk.base>..</giraph.trunk.base>
+        <buildtype>test</buildtype>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.6</version>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>prop.jarLocation</name>
+                            <value>${giraph.trunk.base}/target/giraph-${project.version}-jar-with-dependencies.jar</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.9</version>
+                <configuration>
+                    <configLocation>../checkstyle.xml</configLocation>
+                    <enableRulesSummary>false</enableRulesSummary>
+                    <headerLocation>../license-header.txt</headerLocation>
+	                <failOnError>true</failOnError>
+	                <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>verify</phase>
+                        <goals>
+                             <goal>check</goal>
+                       </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <artifactId>giraph</artifactId>
+            <groupId>org.apache.giraph</groupId>
+            <version>0.2-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.3.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <version>${accumulo.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>${hbase.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <type>test-jar</type>
+            <version>${hbase.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-test</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml Tue May 29 21:56:31 2012
@@ -0,0 +1,120 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE
+              </exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>false</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE
+              </exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>false</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE
+              </exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>false</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *  Class which wraps the AccumuloInputFormat. It's designed
+ *  as an extension point to VertexInputFormat subclasses who wish
+ *  to read from AccumuloTables.
+ *
+ *  Works with
+ *  {@link org.apache.giraph.format.accumulo.AccumuloVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <V>  vertex value type
+ * @param <E>  edge type
+ * @param <M>  message type
+ */
+public abstract class AccumuloVertexInputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> implements Configurable {
+  /**
+   * delegate input format for all accumulo operations.
+   */
+  protected AccumuloInputFormat accumuloInputFormat =
+      new AccumuloInputFormat();
+
+  /**
+   * Configured and injected by the job
+  */
+  private Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+  * Abstract class which provides a template for instantiating vertices
+  * from Accumulo Key/Value pairs.
+  *
+  * @param <I>  vertex id type
+  * @param <V>  vertex value type
+  * @param <E>  edge type
+  * @param <M>  message type
+  */
+  public abstract static class AccumuloVertexReader<
+      I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements VertexReader<I, V, E, M> {
+
+    /**
+    * Used by subclasses to read key/value pairs.
+    */
+    private final RecordReader<Key, Value> reader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    /**
+     * Constructor used to pass Record Reader instance
+     * @param reader  Accumulo record reader
+     */
+    public AccumuloVertexReader(RecordReader<Key, Value> reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit,
+                           TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      reader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    /**
+     * close
+     *
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    /**
+     * getProgress
+     *
+     * @return progress
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public float getProgress() throws IOException, InterruptedException {
+      return reader.getProgress();
+    }
+
+    /**
+    * Get the result record reader
+    *
+    * @return Record reader to be used for reading.
+    */
+    protected RecordReader<Key, Value> getRecordReader() {
+      return reader;
+    }
+
+    /**
+     * getContext
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+  }
+
+  /**
+   * getSplits
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return  tablet splits
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getSplits(
+    JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    List<InputSplit> splits = null;
+    try {
+      splits = accumuloInputFormat.getSplits(context);
+    } catch (IOException e) {
+      if (e.getMessage().contains("Input info has not been set")) {
+        throw new IOException(e.getMessage() +
+                " Make sure you initialized" +
+                " AccumuloInputFormat static setters " +
+                "before passing the config to GiraphJob.");
+      }
+    }
+    return splits;
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+/**
+ *
+ *  Class which wraps the AccumuloOutputFormat. It's designed
+ *  as an extension point to VertexOutputFormat subclasses who wish
+ *  to write vertices back to an Accumulo table.
+ *
+ *  Works with
+ *  {@link org.apache.giraph.format.accumulo.AccumuloVertexInputFormat}
+ *
+ *
+ * @param <I> vertex id type
+ * @param <V>  vertex value type
+ * @param <E>  edge type
+ */
+public abstract class AccumuloVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> implements Configurable {
+
+
+  /**
+   * Output table parameter
+   */
+  protected static final String OUTPUT_TABLE = "OUTPUT_TABLE";
+
+  /**
+   * Accumulo delegate for table output
+   */
+  protected AccumuloOutputFormat accumuloOutputFormat =
+          new AccumuloOutputFormat();
+
+
+  /**
+   * Used by configured interface
+   */
+  private Configuration conf;
+
+  /**
+   *
+   * Main abstraction point for vertex writers to persist back
+   * to Accumulo tables.
+   *
+   * @param <I> vertex id type
+   * @param <V> vertex value type
+   * @param <E>  edge type
+   */
+  public abstract static class AccumuloVertexWriter<
+          I extends WritableComparable,
+          V extends Writable,
+          E extends Writable>
+          implements VertexWriter<I, V, E> {
+
+    /**
+     * task attempt context.
+     */
+    private TaskAttemptContext context;
+
+    /**
+     * Accumulo record writer
+     */
+    private RecordWriter<Text, Mutation> recordWriter;
+
+    /**
+     * Constructor for use with subclasses
+     *
+     * @param recordWriter accumulo record writer
+     */
+    public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
+      this.recordWriter = recordWriter;
+    }
+
+    /**
+     * initialize
+     *
+     * @param context Context used to write the vertices.
+     * @throws IOException
+     */
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
+    }
+
+    /**
+     *  close
+     *
+     * @param context the context of the task
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      recordWriter.close(context);
+    }
+
+    /**
+     * Get the table record writer;
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Mutation> getRecordWriter() {
+      return recordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   *
+   * checkOutputSpecs
+   *
+   * @param context information about the job
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    try {
+      accumuloOutputFormat.checkOutputSpecs(context);
+    } catch (IOException e) {
+      if (e.getMessage().contains("Output info has not been set")) {
+        throw new IOException(e.getMessage() + " Make sure you initialized" +
+                " AccumuloOutputFormat static setters " +
+                "before passing the config to GiraphJob.");
+      }
+    }
+  }
+
+  /**
+   * getOutputCommitter
+   *
+   * @param context the task context
+   * @return OutputCommitter
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return accumuloOutputFormat.getOutputCommitter(context);
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java Tue May 29 21:56:31 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all generic utility classes.
+ */
+package org.apache.giraph.format.accumulo;

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase;
+
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Base class that wraps an HBase TableInputFormat and underlying Scan object
+ * to help instantiate vertices from an HBase table. All
+ * the static TableInputFormat properties necessary to configure
+ * an HBase job are available.
+ *
+ * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table");
+ * from the job setup routine will properly delegate to the
+ * TableInputFormat instance. The Configurable interface prevents specific
+ * wrapper methods from having to be called.
+ *
+ * Works with {@link HBaseVertexOutputFormat}
+ *
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract  class HBaseVertexInputFormat<
+          I extends WritableComparable,
+        V extends Writable,
+        E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> implements Configurable {
+
+  /**
+   * delegate HBase table input format
+   */
+  protected TableInputFormat tableInputFormat =
+         new TableInputFormat();
+  /**
+   * Injected conf by Configurable interface
+   */
+  private Configuration conf;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * setConf()
+   *
+   * We must initialize the table format since we manually instantiate it.
+   *
+   * @param conf Configuration object
+   */
+  public void setConf(Configuration conf) {
+    tableInputFormat.setConf(conf);
+    this.conf = conf;
+  }
+
+  /**
+   * Takes an instance of RecordReader that supports
+   * HBase row-key, result records.  Subclasses can focus on
+   * vertex instantiation details without worrying about connection
+   * semantics. Subclasses are expected to implement nextVertex() and
+   * getCurrentVertex()
+   *
+   *
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message data
+   */
+  public abstract static class HBaseVertexReader<
+          I extends WritableComparable,
+          V extends Writable,
+          E extends Writable, M extends Writable>
+          implements VertexReader<I, V, E, M> {
+
+    /**
+     * reader instance
+     */
+    private final RecordReader<ImmutableBytesWritable, Result> reader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    /**
+     * constructor used for subclassing vertex record reader.
+     * @param reader HBase record reader instance
+     */
+    public HBaseVertexReader(RecordReader<ImmutableBytesWritable,
+                                              Result> reader) {
+      this.reader = reader;
+    }
+
+    /**
+     * initialize
+     *
+     * @param inputSplit Input split to be used for reading vertices.
+     * @param context Context from the task.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void initialize(InputSplit inputSplit,
+                           TaskAttemptContext context)
+      throws IOException,
+      InterruptedException {
+      reader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    /**
+     * close
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    /**
+     * getProgress
+     *
+     * @return progress
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public float getProgress() throws
+      IOException, InterruptedException {
+      return reader.getProgress();
+    }
+
+    /**
+     * getRecordReader
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<ImmutableBytesWritable,
+      Result> getRecordReader() {
+      return reader;
+    }
+
+   /**
+    * getContext
+    *
+    * @return Context passed to initialize.
+    */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+  }
+
+  /**
+   * getSplits
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return HBase region splits
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public List<InputSplit> getSplits(
+  JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return tableInputFormat.getSplits(context);
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hbase;
+
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ *
+ * Base class for writing Vertex mutations back to specific
+ * rows in an HBase table. This class wraps an instance of TableOutputFormat
+ * for easy configuration with the existing properties.
+ *
+ * Setting conf.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
+ * will properly delegate to the TableOutputFormat instance contained
+ * in this class. The Configurable interface prevents specific
+ * wrapper methods from having to be called.
+ *
+ * Works with {@link HBaseVertexInputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HBaseVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat
+                <I, V, E> implements Configurable {
+
+  /**
+   * delegate output format that writes to HBase
+   */
+  protected TableOutputFormat<ImmutableBytesWritable>
+  tableOutputFormat = new TableOutputFormat<ImmutableBytesWritable>();
+  /**
+   * Injected conf by Configurable
+   */
+  private Configuration conf;
+
+  /**
+   *   Constructor
+   *
+   *   Simple class which takes an instance of RecordWriter
+   *   over Writable objects. Subclasses are
+   *   expected to implement writeVertex()
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  public abstract static class HBaseVertexWriter<
+          I extends WritableComparable,
+          V extends Writable,
+          E extends Writable>
+          implements VertexWriter<I, V, E> {
+
+    /**
+     * context
+     */
+    private TaskAttemptContext context;
+
+    /**
+     * record writer instance
+     */
+    private RecordWriter<ImmutableBytesWritable,
+              Writable> recordWriter;
+
+   /**
+    * Constructor for subclasses to implement recordWriter
+    * @param recordWriter subclass instance
+    */
+    public HBaseVertexWriter(RecordWriter<ImmutableBytesWritable,
+             Writable> recordWriter) {
+      this.recordWriter = recordWriter;
+    }
+
+    /**
+     * initialize
+     *
+     * @param context Context used to write the vertices.
+     * @throws IOException
+     */
+    public void initialize(TaskAttemptContext context)
+      throws IOException {
+      this.context = context;
+    }
+
+    /**
+     * close
+     *
+     * @param context the context of the task
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      recordWriter.close(context);
+    }
+
+    /**
+     * Get the table record writer;
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<ImmutableBytesWritable,
+            Writable> getRecordWriter() {
+      return recordWriter;
+    }
+
+    /**
+     * getContext
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+
+  }
+
+  /**
+   * setConf
+   *
+   * @param conf Injected configuration instance
+   */
+  public void setConf(Configuration conf) {
+    tableOutputFormat.setConf(conf);
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * checkOutputSpecs
+   *
+   * @param context information about the job
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    tableOutputFormat.checkOutputSpecs(context);
+  }
+
+  /**
+   * getOutputCommitter
+   *
+   * @param context the task context
+   * @return  OutputCommitter ouputCommitter
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public OutputCommitter getOutputCommitter(
+    TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return tableOutputFormat.getOutputCommitter(context);
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java Tue May 29 21:56:31 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all generic utility classes.
+ */
+package org.apache.giraph.format.hbase;

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java Tue May 29 21:56:31 2012
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.zk.ZooKeeperExt;
+
+import junit.framework.TestCase;
+
+/**
+ * Duplicate copy from main giraph trunk. At least until there
+ * is a maven test artifact for Giraph.
+ *
+ * Extended TestCase for making setting up Bsp testing.
+ */
+public class BspCase extends TestCase implements Watcher {
+  /** JobTracker system property */
+  private final String jobTracker =
+      System.getProperty("prop.mapred.job.tracker");
+  /** Jar location system property */
+  private final String jarLocation =
+      System.getProperty("prop.jarLocation", "");
+  /** Number of actual processes for the BSP application */
+  private int numWorkers = 1;
+  /** ZooKeeper list system property */
+  private final String zkList = System.getProperty("prop.zookeeper.list");
+
+  /**
+   * Adjust the configuration to the basic test case
+   */
+  public final void setupConfiguration(GiraphJob job) {
+    Configuration conf = job.getConfiguration();
+    conf.set("mapred.jar", getJarLocation());
+
+    // Allow this test to be run on a real Hadoop setup
+    if (getJobTracker() != null) {
+      System.out.println("setup: Sending job to job tracker " +
+          getJobTracker() + " with jar path " + getJarLocation()
+          + " for " + getName());
+      conf.set("mapred.job.tracker", getJobTracker());
+      job.setWorkerConfiguration(getNumWorkers(),
+          getNumWorkers(),
+          100.0f);
+    }
+    else {
+      System.out.println("setup: Using local job runner with " +
+          "location " + getJarLocation() + " for "
+          + getName());
+      job.setWorkerConfiguration(1, 1, 100.0f);
+      // Single node testing
+      conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+    }
+    conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
+    conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
+    conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+    if (getZooKeeperList() != null) {
+      job.setZooKeeperConfiguration(getZooKeeperList());
+    }
+    // GeneratedInputSplit will generate 5 vertices
+    conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+  }
+
+  /**
+   * Create the test case
+   *
+   * @param testName name of the test case
+   */
+  public BspCase(String testName) {
+    super(testName);
+
+  }
+
+  /**
+   * Get the number of workers used in the BSP application
+   *
+   * @param numProcs number of processes to use
+   */
+  public int getNumWorkers() {
+    return numWorkers;
+  }
+
+  /**
+   * Get the ZooKeeper list
+   */
+  public String getZooKeeperList() {
+    return zkList;
+  }
+
+  /**
+   * Get the jar location
+   *
+   * @return location of the jar file
+   */
+  String getJarLocation() {
+    return jarLocation;
+  }
+
+  /**
+   * Get the job tracker location
+   *
+   * @return job tracker location as a string
+   */
+  String getJobTracker() {
+    return jobTracker;
+  }
+
+  /**
+   * Get the single part file status and make sure there is only one part
+   *
+   * @param job Job to get the file system from
+   * @param partDirPath Directory where the single part file should exist
+   * @return Single part file status
+   * @throws IOException
+   */
+  public static FileStatus getSinglePartFileStatus(GiraphJob job,
+      Path partDirPath) throws IOException {
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+    FileStatus[] statusArray = fs.listStatus(partDirPath);
+    FileStatus singlePartFileStatus = null;
+    int partFiles = 0;
+    for (FileStatus fileStatus : statusArray) {
+      if (fileStatus.getPath().getName().equals("part-m-00000")) {
+        singlePartFileStatus = fileStatus;
+      }
+      if (fileStatus.getPath().getName().startsWith("part-m-")) {
+        ++partFiles;
+      }
+    }
+    if (partFiles != 1) {
+      throw new ArithmeticException(
+          "getSinglePartFile: Part file count should be 1, but is " +
+              partFiles);
+    }
+    return singlePartFileStatus;
+  }
+
+  @Override
+  public void setUp() {
+    if (jobTracker != null) {
+      System.out.println("Setting tasks to 3 for " + getName() +
+          " since JobTracker exists...");
+      numWorkers = 3;
+    }
+    try {
+      Configuration conf = new Configuration();
+      FileSystem hdfs = FileSystem.get(conf);
+      // Since local jobs always use the same paths, remove them
+      Path oldLocalJobPaths = new Path(
+          GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
+      FileStatus[] fileStatusArr;
+      try {
+        fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
+        for (FileStatus fileStatus : fileStatusArr) {
+          if (fileStatus.isDir() &&
+              fileStatus.getPath().getName().contains("job_local")) {
+            System.out.println("Cleaning up local job path " +
+                fileStatus.getPath().getName());
+            hdfs.delete(oldLocalJobPaths, true);
+          }
+        }
+      } catch (FileNotFoundException e) {
+        // ignore this FileNotFound exception and continue.
+      }
+      if (zkList == null) {
+        return;
+      }
+      ZooKeeperExt zooKeeperExt =
+          new ZooKeeperExt(zkList, 30*1000, this);
+      List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+      for (String rootChild : rootChildren) {
+        if (rootChild.startsWith("_hadoopBsp")) {
+          List<String> children =
+              zooKeeperExt.getChildren("/" + rootChild, false);
+          for (String child: children) {
+            if (child.contains("job_local_")) {
+              System.out.println("Cleaning up /_hadoopBsp/" +
+                  child);
+              zooKeeperExt.deleteExt(
+                  "/_hadoopBsp/" + child, -1, true);
+            }
+          }
+        }
+      }
+      zooKeeperExt.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // Do nothing
+  }
+
+  /**
+   * Helper method to remove an old output directory if it exists,
+   * and set the output path for any VertexOutputFormat that uses
+   * FileOutputFormat.
+   *
+   * @param job Job to set the output path for
+   * @param outputPathString Path to output as a string
+   * @throws IOException
+   */
+  public static void removeAndSetOutput(GiraphJob job,
+      Path outputPath) throws IOException {
+    remove(job.getConfiguration(), outputPath);
+    FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void remove(Configuration conf, Path path)
+      throws IOException {
+    FileSystem hdfs = FileSystem.get(conf);
+    hdfs.delete(path, true);
+  }
+
+  public static String getCallingMethodName() {
+    return Thread.currentThread().getStackTrace()[2].getMethodName();
+  }
+}

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.accumulo;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.format.accumulo.edgemarker.AccumuloEdgeInputFormat;
+import org.apache.giraph.format.accumulo.edgemarker.AccumuloEdgeOutputFormat;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+
+/*
+    Test class for Accumulo vertex input/output formats.
+ */
+public class TestAccumuloVertexFormat extends BspCase{
+
+    private final String TABLE_NAME = "simple_graph";
+    private final String INSTANCE_NAME = "instance";
+    private final Text FAMILY = new Text("cf");
+    private final Text CHILDREN = new Text("children");
+    private final String USER = "root";
+    private final byte[] PASSWORD = new byte[] {};
+    private final Text OUTPUT_FIELD = new Text("parent");
+
+
+    private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public TestAccumuloVertexFormat(String testName) {
+        super(testName);
+    }
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(TestAccumuloVertexFormat.class);
+
+    }
+    /*
+     Write a simple parent-child directed graph to Accumulo.
+     Run a job which reads the values
+     into subclasses that extend AccumuloVertex I/O formats.
+     Check the output after the job.
+     */
+    public void testAccumuloInputOutput() throws Exception {
+        if (System.getProperty("prop.mapred.job.tracker") != null) {
+            if(log.isInfoEnabled())
+                log.info("testAccumuloInputOutput: " +
+                        "Ignore this test if not local mode.");
+            return;
+        }
+
+        File jarTest = new File(System.getProperty("prop.jarLocation"));
+        if(!jarTest.exists()) {
+            fail("Could not find Giraph jar at " +
+                    "location specified by 'prop.jarLocation'. " +
+                    "Make sure you built the main Giraph artifact?.");
+        }
+
+        //Write out vertices and edges out to a mock instance.
+        MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+        Connector c = mockInstance.getConnector("root", new byte[] {});
+        c.tableOperations().create(TABLE_NAME);
+        BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
+
+        Mutation m1 = new Mutation(new Text("0001"));
+        m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
+        bw.addMutation(m1);
+
+        Mutation m2 = new Mutation(new Text("0002"));
+        m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
+        bw.addMutation(m2);
+        if(log.isInfoEnabled())
+            log.info("Writing mutations to Accumulo table");
+        bw.close();
+
+        Configuration conf = new Configuration();
+        conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+
+        /*
+        Very important to initialize the formats before
+        sending configuration to the GiraphJob. Otherwise
+        the internally constructed Job in GiraphJob will
+        not have the proper context initialization.
+         */
+        AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
+                TABLE_NAME, new Authorizations());
+        AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
+
+        AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
+        AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
+
+        GiraphJob job = new GiraphJob(conf, getCallingMethodName());
+        setupConfiguration(job);
+        job.setVertexClass(EdgeNotification.class);
+        job.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
+        job.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
+
+        HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
+        columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
+        AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
+
+        if(log.isInfoEnabled())
+            log.info("Running edge notification job using Accumulo input");
+        assertTrue(job.run(true));
+        Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
+        scanner.setRange(new Range("0002", "0002"));
+        scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
+        boolean foundColumn = false;
+
+        if(log.isInfoEnabled())
+            log.info("Verify job output persisted correctly.");
+        //make sure we found the qualifier.
+        assertTrue(scanner.iterator().hasNext());
+
+
+        //now we check to make sure the expected value from the job persisted correctly.
+        for(Map.Entry<Key,Value> entry : scanner) {
+            Text row = entry.getKey().getRow();
+            assertEquals("0002", row.toString());
+            Value value = entry.getValue();
+            assertEquals("0001", ByteBufferUtil.toString(
+                    ByteBuffer.wrap(value.get())));
+            foundColumn = true;
+        }
+    }
+    /*
+    Test compute method that sends each edge a notification of its parents.
+    The test set only has a 1-1 parent-to-child ratio for this unit test.
+     */
+    public static class EdgeNotification
+            extends EdgeListVertex<Text, Text, Text, Text> {
+        @Override
+        public void compute(Iterator<Text> msgIterator) throws IOException {
+            while (msgIterator.hasNext()) {
+                getVertexValue().set(msgIterator.next());
+            }
+            if(getSuperstep() == 0) {
+                sendMsgToAllEdges(getVertexId());
+            }
+            voteToHalt();
+        }
+    }
+}
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo.edgemarker;
+
+import com.google.common.collect.Maps;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.format.accumulo.AccumuloVertexInputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/*
+ Example subclass which reads in Key/Value pairs to construct vertex objects.
+ */
+public class AccumuloEdgeInputFormat
+        extends AccumuloVertexInputFormat<Text, Text, Text, Text> {
+
+    private static final Text uselessEdgeValue = new Text();
+    private Configuration conf;
+    public VertexReader<Text, Text, Text, Text>
+    createVertexReader(InputSplit split, TaskAttemptContext context)
+            throws IOException {
+        try {
+
+        return new AccumuloEdgeVertexReader(
+                accumuloInputFormat.createRecordReader(split, context)) {
+        };
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+
+    }
+    /*
+        Reader takes Key/Value pairs from the underlying input format.
+     */
+    public static class AccumuloEdgeVertexReader
+            extends AccumuloVertexReader<Text, Text, Text, Text> {
+
+        public static final Pattern commaPattern = Pattern.compile("[,]");
+
+        public AccumuloEdgeVertexReader(RecordReader<Key, Value> recordReader) {
+            super(recordReader);
+        }
+
+
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        /*
+       Each Key/Value contains the information needed to construct the vertices.
+         */
+        public BasicVertex<Text, Text, Text, Text> getCurrentVertex()
+                throws IOException, InterruptedException {
+              Key key = getRecordReader().getCurrentKey();
+              Value value = getRecordReader().getCurrentValue();
+              BasicVertex<Text, Text, Text,
+                      Text> vertex = BspUtils.<Text, Text, Text,
+                      Text>createVertex(getContext().getConfiguration());
+              Text vertexId = key.getRow();
+              Map<Text, Text> edges = Maps.newHashMap();
+              String edge = new String(value.get());
+              Text edgeId = new Text(edge);
+              edges.put(edgeId, uselessEdgeValue);
+              vertex.initialize(vertexId, new Text(), edges, null);
+
+            return vertex;
+        }
+    }
+}
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo.edgemarker;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.format.accumulo.AccumuloVertexOutputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/*
+ Example subclass for writing vertices back to Accumulo.
+ */
+public class AccumuloEdgeOutputFormat
+        extends AccumuloVertexOutputFormat<Text, Text, Text> {
+
+    public VertexWriter<Text, Text, Text>
+    createVertexWriter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        RecordWriter<Text, Mutation> writer =
+                accumuloOutputFormat.getRecordWriter(context);
+        String tableName = context.getConfiguration().get(OUTPUT_TABLE);
+        if(tableName == null)
+           throw new IOException("Forgot to set table name " +
+                   "using AccumuloVertexOutputFormat.OUTPUT_TABLE");
+        return new AccumuloEdgeVertexWriter(writer, tableName);
+    }
+
+    /*
+    Wraps RecordWriter for writing Mutations back to the configured Accumulo Table.
+     */
+    public static class AccumuloEdgeVertexWriter
+            extends AccumuloVertexWriter<Text, Text, Text> {
+
+        private final Text CF = new Text("cf");
+        private final Text PARENT =  new Text("parent");
+        private Text tableName;
+
+        public AccumuloEdgeVertexWriter(
+                RecordWriter<Text, Mutation> writer, String tableName) {
+            super(writer);
+            this.tableName = new Text(tableName);
+        }
+        /*
+         Write back a mutation that adds a qualifier for 'parent' containing the vertex value
+         as the cell value. Assume the vertex ID corresponds to a key.
+         */
+        public void writeVertex
+        (BasicVertex<Text, Text, Text, ?> vertex)
+                throws IOException, InterruptedException {
+              RecordWriter<Text, Mutation> writer = getRecordWriter();
+              Mutation mt = new Mutation(vertex.getVertexId());
+              mt.put(CF, PARENT, new Value(
+                      vertex.getVertexValue().toString().getBytes()));
+              writer.write(tableName, mt);
+        }
+    }
+}
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hbase;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.format.hbase.edgemarker.TableEdgeInputFormat;
+import org.apache.giraph.format.hbase.edgemarker.TableEdgeOutputFormat;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/*
+Test case for HBase reading/writing vertices from an HBase instance.
+*/
+public class TestHBaseRootMarkerVertextFormat extends BspCase {
+
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    private HBaseTestingUtility testUtil = new HBaseTestingUtility();
+    private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
+
+    private final String TABLE_NAME = "simple_graph";
+    private final String FAMILY = "cf";
+    private final String QUALIFER = "children";
+    private final String OUTPUT_FIELD = "parent";
+
+    public TestHBaseRootMarkerVertextFormat(String testName) {
+        super(testName);
+    }
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(TestHBaseRootMarkerVertextFormat.class);
+
+    }
+
+    public void testHBaseInputOutput() throws Exception{
+
+        if (System.getProperty("prop.mapred.job.tracker") != null) {
+            if(log.isInfoEnabled())
+                log.info("testHBaseInputOutput: Ignore this test if not local mode.");
+            return;
+        }
+
+        File jarTest = new File(System.getProperty("prop.jarLocation"));
+        if(!jarTest.exists()) {
+            fail("Could not find Giraph jar at " +
+                    "location specified by 'prop.jarLocation'. " +
+                    "Make sure you built the main Giraph artifact?.");
+        }
+
+        String INPUT_FILE = "graph.csv";
+        //First let's load some data using ImportTsv into our mock table.
+        String[] args = new String[] {
+                "-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
+                "-Dimporttsv.separator=" + "\u002c",
+                TABLE_NAME,
+                INPUT_FILE
+        };
+
+
+        MiniHBaseCluster cluster = testUtil.startMiniCluster();
+
+        GenericOptionsParser opts =
+                new GenericOptionsParser(cluster.getConfiguration(), args);
+        Configuration conf = opts.getConfiguration();
+        args = opts.getRemainingArgs();
+
+        try {
+
+            FileSystem fs = FileSystem.get(conf);
+            FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+            String line1 = "0001,0002\n";
+            String line2 = "0002,0004\n";
+            String line3 = "0003,0005\n";
+            String line4 = "0004,-1\n";
+            String line5 = "0005,-1\n";
+            op.write(line1.getBytes());
+            op.write(line2.getBytes());
+            op.write(line3.getBytes());
+            op.write(line4.getBytes());
+            op.write(line5.getBytes());
+            op.close();
+
+            final byte[] FAM = Bytes.toBytes(FAMILY);
+            final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+
+            HTableDescriptor desc = new HTableDescriptor(TAB);
+            desc.addFamily(new HColumnDescriptor(FAM));
+            new HBaseAdmin(conf).createTable(desc);
+
+            Job job = ImportTsv.createSubmittableJob(conf, args);
+            job.waitForCompletion(false);
+            assertTrue(job.isSuccessful());
+            if(log.isInfoEnabled())
+                log.info("ImportTsv successful. Running HBase Giraph job.");
+
+            //now operate over HBase using Vertex I/O formats
+            conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
+            conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+
+            GiraphJob giraphJob = new GiraphJob(conf, getCallingMethodName());
+            giraphJob.setZooKeeperConfiguration(
+                    cluster.getMaster().getZooKeeper().getQuorum());
+            setupConfiguration(giraphJob);
+            giraphJob.setVertexClass(EdgeNotification.class);
+            giraphJob.setVertexInputFormatClass(TableEdgeInputFormat.class);
+            giraphJob.setVertexOutputFormatClass(TableEdgeOutputFormat.class);
+
+            assertTrue(giraphJob.run(true));
+            if(log.isInfoEnabled())
+                log.info("Giraph job successful. Checking output qualifier.");
+
+            //Do a get on row 0002, it should have a parent of 0001
+            //if the outputFormat worked.
+            HTable table = new HTable(conf, TABLE_NAME);
+            Result result = table.get(new Get("0002".getBytes()));
+            byte[] parentBytes = result.getValue(FAMILY.getBytes(),
+                    OUTPUT_FIELD.getBytes());
+            assertNotNull(parentBytes);
+            assertTrue(parentBytes.length > 0);
+            assertEquals("0001", Bytes.toString(parentBytes));
+
+        }   finally {
+            cluster.shutdown();
+        }
+    }
+
+
+    /*
+    Test compute method that sends each edge a notification of its parents.
+    The test set only has a 1-1 parent-to-child ratio for this unit test.
+     */
+    public static class EdgeNotification
+            extends EdgeListVertex<Text, Text, Text, Text> {
+        @Override
+        public void compute(Iterator<Text> msgIterator) throws IOException {
+            while (msgIterator.hasNext()) {
+                getVertexValue().set(msgIterator.next());
+            }
+            if(getSuperstep() == 0) {
+                sendMsgToAllEdges(getVertexId());
+            }
+            voteToHalt();
+        }
+    }
+}
\ No newline at end of file

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase.edgemarker;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.format.hbase.HBaseVertexInputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+
+/*
+  Test subclass for HBaseVertexInputFormat. Reads a simple
+  children qualifier to create an edge.
+ */
+public class TableEdgeInputFormat extends
+        HBaseVertexInputFormat<Text, Text, Text, Text> {
+
+    private static final Logger log =
+            Logger.getLogger(TableEdgeInputFormat.class);
+    private static final Text uselessEdgeValue = new Text();
+    private Configuration conf;
+
+    public VertexReader<Text, Text, Text, Text>
+            createVertexReader(InputSplit split,
+                               TaskAttemptContext context) throws IOException {
+
+        return new TableEdgeVertexReader(
+                tableInputFormat.createRecordReader(split, context));
+
+    }
+
+    /*
+     Uses the RecordReader to return Hbase rows
+     */
+    public static class TableEdgeVertexReader
+            extends HBaseVertexReader<Text, Text, Text, Text> {
+
+        private final byte[] CF = Bytes.toBytes("cf");
+        private final byte[] CHILDREN = Bytes.toBytes("children");
+
+        public TableEdgeVertexReader(
+                RecordReader<ImmutableBytesWritable, Result> recordReader) {
+            super(recordReader);
+        }
+
+
+        public boolean nextVertex() throws IOException,
+                InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        /*
+         For each row, create a vertex with the row ID as a text,
+         and it's 'children' qualifier as a single edge.
+         */
+        public BasicVertex<Text, Text, Text, Text>
+                    getCurrentVertex()
+                throws IOException, InterruptedException {
+            Result row = getRecordReader().getCurrentValue();
+            BasicVertex<Text, Text, Text, Text> vertex =
+                    BspUtils.<Text, Text, Text, Text>
+                            createVertex(getContext().getConfiguration());
+            Text vertexId = new Text(Bytes.toString(row.getRow()));
+            Map<Text, Text> edges = Maps.newHashMap();
+            String edge = Bytes.toString(row.getValue(CF, CHILDREN));
+            Text vertexValue = new Text();
+            Text edgeId = new Text(edge);
+            edges.put(edgeId, uselessEdgeValue);
+            vertex.initialize(vertexId, vertexValue, edges, null);
+
+            return vertex;
+        }
+    }
+}

Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase.edgemarker;
+
+import org.apache.giraph.format.hbase.HBaseVertexOutputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+/*
+ Test subclass for HBaseVertexOutputFormat
+ */
+public class TableEdgeOutputFormat
+        extends HBaseVertexOutputFormat<Text, Text, Text> {
+
+
+    public VertexWriter<Text, Text, Text>
+    createVertexWriter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        RecordWriter<ImmutableBytesWritable, Writable> writer =
+                tableOutputFormat.getRecordWriter(context);
+        return new TableEdgeVertexWriter(writer);
+    }
+
+    /*
+     For each vertex, write back to the configured table using
+     the vertex id as the row key bytes.
+     */
+    public static class TableEdgeVertexWriter
+            extends HBaseVertexWriter<Text, Text, Text> {
+
+        private final byte[] CF = Bytes.toBytes("cf");
+        private final byte[] PARENT =  Bytes.toBytes("parent");
+
+        public TableEdgeVertexWriter(
+                RecordWriter<ImmutableBytesWritable, Writable> writer) {
+            super(writer);
+        }
+        /*
+         Record the vertex value as a the value for a new qualifier 'parent'.
+         */
+        public void writeVertex(
+                BasicVertex<Text, Text, Text, ?> vertex)
+                throws IOException, InterruptedException {
+              RecordWriter<ImmutableBytesWritable, Writable> writer = getRecordWriter();
+              byte[] rowBytes = vertex.getVertexId().getBytes();
+              Put put = new Put(rowBytes);
+              Text value = vertex.getVertexValue();
+              if(value.toString().length() > 0)   {
+                 put.add(CF, PARENT, value.getBytes());
+                 writer.write(new ImmutableBytesWritable(rowBytes), put);
+              }
+        }
+    }
+}