You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/05/29 23:56:32 UTC
svn commit: r1344020 - in /giraph/trunk: ./ giraph-formats-contrib/
giraph-formats-contrib/src/ giraph-formats-contrib/src/main/
giraph-formats-contrib/src/main/assembly/
giraph-formats-contrib/src/main/java/
giraph-formats-contrib/src/main/java/org/ g...
Author: aching
Date: Tue May 29 21:56:31 2012
New Revision: 1344020
URL: http://svn.apache.org/viewvc?rev=1344020&view=rev
Log:
GIRAPH-153: HBase/Accumulo Input and Output formats. (bfem via aching)
Added:
giraph/trunk/giraph-formats-contrib/
giraph/trunk/giraph-formats-contrib/pom.xml
giraph/trunk/giraph-formats-contrib/src/
giraph/trunk/giraph-formats-contrib/src/main/
giraph/trunk/giraph-formats-contrib/src/main/assembly/
giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml
giraph/trunk/giraph-formats-contrib/src/main/java/
giraph/trunk/giraph-formats-contrib/src/main/java/org/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java
giraph/trunk/giraph-formats-contrib/src/test/
giraph/trunk/giraph-formats-contrib/src/test/java/
giraph/trunk/giraph-formats-contrib/src/test/java/org/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
Modified:
giraph/trunk/CHANGELOG
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1344020&r1=1344019&r2=1344020&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue May 29 21:56:31 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-153: HBase/Accumulo Input and Output formats. (bfem via aching)
+
GIRAPH-187: SequenceFileVertexInputFormat has WritableComparable<I>
as a bounded type for I. (roman4asf via aching)
Added: giraph/trunk/giraph-formats-contrib/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (added)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Tue May 29 21:56:31 2012
@@ -0,0 +1,148 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>giraph</groupId>
+ <version>0.2-SNAPSHOT</version>
+ <artifactId>giraph-formats-contrib</artifactId>
+ <properties>
+ <compileSource>1.6</compileSource>
+ <hadoop.version>0.20.203.0</hadoop.version>
+ <hbase.version>0.90.5</hbase.version>
+ <accumulo.version>1.4.0</accumulo.version>
+ <maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
+ <maven-javadoc-plugin.version>2.6</maven-javadoc-plugin.version>
+ <giraph.trunk.base>..</giraph.trunk.base>
+ <buildtype>test</buildtype>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>prop.jarLocation</name>
+ <value>${giraph.trunk.base}/target/giraph-${project.version}-jar-with-dependencies.jar</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <configLocation>../checkstyle.xml</configLocation>
+ <enableRulesSummary>false</enableRulesSummary>
+ <headerLocation>../license-header.txt</headerLocation>
+ <failOnError>true</failOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <artifactId>giraph</artifactId>
+ <groupId>org.apache.giraph</groupId>
+ <version>0.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>${accumulo.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <type>test-jar</type>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/assembly/compile.xml Tue May 29 21:56:31 2012
@@ -0,0 +1,120 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>/</outputDirectory>
+ <unpackOptions>
+ <excludes>
+ <exclude>META-INF/LICENSE
+ </exclude>
+ </excludes>
+ </unpackOptions>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>/</outputDirectory>
+ <unpackOptions>
+ <excludes>
+ <exclude>META-INF/LICENSE
+ </exclude>
+ </excludes>
+ </unpackOptions>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>/</outputDirectory>
+ <unpackOptions>
+ <excludes>
+ <exclude>META-INF/LICENSE
+ </exclude>
+ </excludes>
+ </unpackOptions>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class which wraps the AccumuloInputFormat. It's designed
+ * as an extension point to VertexInputFormat subclasses who wish
+ * to read from AccumuloTables.
+ *
+ * Works with
+ * {@link org.apache.giraph.format.accumulo.AccumuloVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ * @param <M> message type
+ */
+public abstract class AccumuloVertexInputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexInputFormat<I, V, E, M> implements Configurable {
+ /**
+ * delegate input format for all accumulo operations.
+ */
+ protected AccumuloInputFormat accumuloInputFormat =
+ new AccumuloInputFormat();
+
+ /**
+ * Configured and injected by the job
+ */
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Abstract class which provides a template for instantiating vertices
+ * from Accumulo Key/Value pairs.
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ * @param <M> message type
+ */
+ public abstract static class AccumuloVertexReader<
+ I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+
+ /**
+ * Used by subclasses to read key/value pairs.
+ */
+ private final RecordReader<Key, Value> reader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Constructor used to pass Record Reader instance
+ * @param reader Accumulo record reader
+ */
+ public AccumuloVertexReader(RecordReader<Key, Value> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ reader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ /**
+ * close
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /**
+ * getProgress
+ *
+ * @return progress
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public float getProgress() throws IOException, InterruptedException {
+ return reader.getProgress();
+ }
+
+ /**
+ * Get the result record reader
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<Key, Value> getRecordReader() {
+ return reader;
+ }
+
+ /**
+ * getContext
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+
+ }
+
+ /**
+ * getSplits
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return tablet splits
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public List<InputSplit> getSplits(
+ JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ List<InputSplit> splits = null;
+ try {
+ splits = accumuloInputFormat.getSplits(context);
+ } catch (IOException e) {
+ if (e.getMessage().contains("Input info has not been set")) {
+ throw new IOException(e.getMessage() +
+ " Make sure you initialized" +
+ " AccumuloInputFormat static setters " +
+ "before passing the config to GiraphJob.");
+ }
+ }
+ return splits;
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/AccumuloVertexOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+/**
+ *
+ * Class which wraps the AccumuloOutputFormat. It's designed
+ * as an extension point to VertexOutputFormat subclasses who wish
+ * to write vertices back to an Accumulo table.
+ *
+ * Works with
+ * {@link org.apache.giraph.format.accumulo.AccumuloVertexInputFormat}
+ *
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ */
+public abstract class AccumuloVertexOutputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ extends VertexOutputFormat<I, V, E> implements Configurable {
+
+
+ /**
+ * Output table parameter
+ */
+ protected static final String OUTPUT_TABLE = "OUTPUT_TABLE";
+
+ /**
+ * Accumulo delegate for table output
+ */
+ protected AccumuloOutputFormat accumuloOutputFormat =
+ new AccumuloOutputFormat();
+
+
+ /**
+ * Used by configured interface
+ */
+ private Configuration conf;
+
+ /**
+ *
+ * Main abstraction point for vertex writers to persist back
+ * to Accumulo tables.
+ *
+ * @param <I> vertex id type
+ * @param <V> vertex value type
+ * @param <E> edge type
+ */
+ public abstract static class AccumuloVertexWriter<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ implements VertexWriter<I, V, E> {
+
+ /**
+ * task attempt context.
+ */
+ private TaskAttemptContext context;
+
+ /**
+ * Accumulo record writer
+ */
+ private RecordWriter<Text, Mutation> recordWriter;
+
+ /**
+ * Constructor for use with subclasses
+ *
+ * @param recordWriter accumulo record writer
+ */
+ public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
+ this.recordWriter = recordWriter;
+ }
+
+ /**
+ * initialize
+ *
+ * @param context Context used to write the vertices.
+ * @throws IOException
+ */
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ /**
+ * close
+ *
+ * @param context the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ recordWriter.close(context);
+ }
+
+ /**
+ * Get the table record writer;
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<Text, Mutation> getRecordWriter() {
+ return recordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ *
+ * checkOutputSpecs
+ *
+ * @param context information about the job
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ try {
+ accumuloOutputFormat.checkOutputSpecs(context);
+ } catch (IOException e) {
+ if (e.getMessage().contains("Output info has not been set")) {
+ throw new IOException(e.getMessage() + " Make sure you initialized" +
+ " AccumuloOutputFormat static setters " +
+ "before passing the config to GiraphJob.");
+ }
+ }
+ }
+
+ /**
+ * getOutputCommitter
+ *
+ * @param context the task context
+ * @return OutputCommitter
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return accumuloOutputFormat.getOutputCommitter(context);
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/accumulo/package-info.java Tue May 29 21:56:31 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all generic utility classes.
+ */
+package org.apache.giraph.format.accumulo;
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase;
+
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Base class that wraps an HBase TableInputFormat and underlying Scan object
+ * to help instantiate vertices from an HBase table. All
+ * the static TableInputFormat properties necessary to configure
+ * an HBase job are available.
+ *
+ * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table");
+ * from the job setup routine will properly delegate to the
+ * TableInputFormat instance. The Configurable interface prevents specific
+ * wrapper methods from having to be called.
+ *
+ * Works with {@link HBaseVertexOutputFormat}
+ *
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HBaseVertexInputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexInputFormat<I, V, E, M> implements Configurable {
+
+ /**
+ * delegate HBase table input format
+ */
+ protected TableInputFormat tableInputFormat =
+ new TableInputFormat();
+ /**
+ * Injected conf by Configurable interface
+ */
+ private Configuration conf;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * setConf()
+ *
+ * We must initialize the table format since we manually instantiate it.
+ *
+ * @param conf Configuration object
+ */
+ public void setConf(Configuration conf) {
+ tableInputFormat.setConf(conf);
+ this.conf = conf;
+ }
+
+ /**
+ * Takes an instance of RecordReader that supports
+ * HBase row-key, result records. Subclasses can focus on
+ * vertex instantiation details without worrying about connection
+ * semantics. Subclasses are expected to implement nextVertex() and
+ * getCurrentVertex()
+ *
+ *
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+ public abstract static class HBaseVertexReader<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+
+ /**
+ * reader instance
+ */
+ private final RecordReader<ImmutableBytesWritable, Result> reader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * constructor used for subclassing vertex record reader.
+ * @param reader HBase record reader instance
+ */
+ public HBaseVertexReader(RecordReader<ImmutableBytesWritable,
+ Result> reader) {
+ this.reader = reader;
+ }
+
+ /**
+ * initialize
+ *
+ * @param inputSplit Input split to be used for reading vertices.
+ * @param context Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context)
+ throws IOException,
+ InterruptedException {
+ reader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ /**
+ * close
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /**
+ * getProgress
+ *
+ * @return progress
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public float getProgress() throws
+ IOException, InterruptedException {
+ return reader.getProgress();
+ }
+
+ /**
+ * getRecordReader
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<ImmutableBytesWritable,
+ Result> getRecordReader() {
+ return reader;
+ }
+
+ /**
+ * getContext
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+
+ }
+
+ /**
+ * getSplits
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return HBase region splits
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public List<InputSplit> getSplits(
+ JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return tableInputFormat.getSplits(context);
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/HBaseVertexOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hbase;
+
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ *
+ * Base class for writing Vertex mutations back to specific
+ * rows in an HBase table. This class wraps an instance of TableOutputFormat
+ * for easy configuration with the existing properties.
+ *
+ * Setting conf.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
+ * will properly delegate to the TableOutputFormat instance contained
+ * in this class. The Configurable interface prevents specific
+ * wrapper methods from having to be called.
+ *
+ * Works with {@link HBaseVertexInputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HBaseVertexOutputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ extends VertexOutputFormat
+ <I, V, E> implements Configurable {
+
+ /**
+ * delegate output format that writes to HBase
+ */
+ protected TableOutputFormat<ImmutableBytesWritable>
+ tableOutputFormat = new TableOutputFormat<ImmutableBytesWritable>();
+ /**
+ * Injected conf by Configurable
+ */
+ private Configuration conf;
+
+ /**
+ * Constructor
+ *
+ * Simple class which takes an instance of RecordWriter
+ * over Writable objects. Subclasses are
+ * expected to implement writeVertex()
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+ public abstract static class HBaseVertexWriter<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable>
+ implements VertexWriter<I, V, E> {
+
+ /**
+ * context
+ */
+ private TaskAttemptContext context;
+
+ /**
+ * record writer instance
+ */
+ private RecordWriter<ImmutableBytesWritable,
+ Writable> recordWriter;
+
+ /**
+ * Constructor for subclasses to implement recordWriter
+ * @param recordWriter subclass instance
+ */
+ public HBaseVertexWriter(RecordWriter<ImmutableBytesWritable,
+ Writable> recordWriter) {
+ this.recordWriter = recordWriter;
+ }
+
+ /**
+ * initialize
+ *
+ * @param context Context used to write the vertices.
+ * @throws IOException
+ */
+ public void initialize(TaskAttemptContext context)
+ throws IOException {
+ this.context = context;
+ }
+
+ /**
+ * close
+ *
+ * @param context the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ recordWriter.close(context);
+ }
+
+ /**
+ * Get the table record writer;
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<ImmutableBytesWritable,
+ Writable> getRecordWriter() {
+ return recordWriter;
+ }
+
+ /**
+ * getContext
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+
+ }
+
+ /**
+ * setConf
+ *
+ * @param conf Injected configuration instance
+ */
+ public void setConf(Configuration conf) {
+ tableOutputFormat.setConf(conf);
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * checkOutputSpecs
+ *
+ * @param context information about the job
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ tableOutputFormat.checkOutputSpecs(context);
+ }
+
+ /**
+ * getOutputCommitter
+ *
+ * @param context the task context
+ * @return OutputCommitter ouputCommitter
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return tableOutputFormat.getOutputCommitter(context);
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hbase/package-info.java Tue May 29 21:56:31 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all generic utility classes.
+ */
+package org.apache.giraph.format.hbase;
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/BspCase.java Tue May 29 21:56:31 2012
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.zk.ZooKeeperExt;
+
+import junit.framework.TestCase;
+
+/**
+ * Duplicate copy from main giraph trunk. At least until there
+ * is a maven test artifact for Giraph.
+ *
+ * Extended TestCase for making setting up Bsp testing.
+ */
+public class BspCase extends TestCase implements Watcher {
+ /** JobTracker system property */
+ private final String jobTracker =
+ System.getProperty("prop.mapred.job.tracker");
+ /** Jar location system property */
+ private final String jarLocation =
+ System.getProperty("prop.jarLocation", "");
+ /** Number of actual processes for the BSP application */
+ private int numWorkers = 1;
+ /** ZooKeeper list system property */
+ private final String zkList = System.getProperty("prop.zookeeper.list");
+
+ /**
+ * Adjust the configuration to the basic test case
+ */
+ public final void setupConfiguration(GiraphJob job) {
+ Configuration conf = job.getConfiguration();
+ conf.set("mapred.jar", getJarLocation());
+
+ // Allow this test to be run on a real Hadoop setup
+ if (getJobTracker() != null) {
+ System.out.println("setup: Sending job to job tracker " +
+ getJobTracker() + " with jar path " + getJarLocation()
+ + " for " + getName());
+ conf.set("mapred.job.tracker", getJobTracker());
+ job.setWorkerConfiguration(getNumWorkers(),
+ getNumWorkers(),
+ 100.0f);
+ }
+ else {
+ System.out.println("setup: Using local job runner with " +
+ "location " + getJarLocation() + " for "
+ + getName());
+ job.setWorkerConfiguration(1, 1, 100.0f);
+ // Single node testing
+ conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+ }
+ conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
+ conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
+ conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+ if (getZooKeeperList() != null) {
+ job.setZooKeeperConfiguration(getZooKeeperList());
+ }
+ // GeneratedInputSplit will generate 5 vertices
+ conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+ }
+
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public BspCase(String testName) {
+ super(testName);
+
+ }
+
+ /**
+ * Get the number of workers used in the BSP application
+ *
+ * @param numProcs number of processes to use
+ */
+ public int getNumWorkers() {
+ return numWorkers;
+ }
+
+ /**
+ * Get the ZooKeeper list
+ */
+ public String getZooKeeperList() {
+ return zkList;
+ }
+
+ /**
+ * Get the jar location
+ *
+ * @return location of the jar file
+ */
+ String getJarLocation() {
+ return jarLocation;
+ }
+
+ /**
+ * Get the job tracker location
+ *
+ * @return job tracker location as a string
+ */
+ String getJobTracker() {
+ return jobTracker;
+ }
+
+ /**
+ * Get the single part file status and make sure there is only one part
+ *
+ * @param job Job to get the file system from
+ * @param partDirPath Directory where the single part file should exist
+ * @return Single part file status
+ * @throws IOException
+ */
+ public static FileStatus getSinglePartFileStatus(GiraphJob job,
+ Path partDirPath) throws IOException {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ FileStatus[] statusArray = fs.listStatus(partDirPath);
+ FileStatus singlePartFileStatus = null;
+ int partFiles = 0;
+ for (FileStatus fileStatus : statusArray) {
+ if (fileStatus.getPath().getName().equals("part-m-00000")) {
+ singlePartFileStatus = fileStatus;
+ }
+ if (fileStatus.getPath().getName().startsWith("part-m-")) {
+ ++partFiles;
+ }
+ }
+ if (partFiles != 1) {
+ throw new ArithmeticException(
+ "getSinglePartFile: Part file count should be 1, but is " +
+ partFiles);
+ }
+ return singlePartFileStatus;
+ }
+
+ @Override
+ public void setUp() {
+ if (jobTracker != null) {
+ System.out.println("Setting tasks to 3 for " + getName() +
+ " since JobTracker exists...");
+ numWorkers = 3;
+ }
+ try {
+ Configuration conf = new Configuration();
+ FileSystem hdfs = FileSystem.get(conf);
+ // Since local jobs always use the same paths, remove them
+ Path oldLocalJobPaths = new Path(
+ GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
+ FileStatus[] fileStatusArr;
+ try {
+ fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.isDir() &&
+ fileStatus.getPath().getName().contains("job_local")) {
+ System.out.println("Cleaning up local job path " +
+ fileStatus.getPath().getName());
+ hdfs.delete(oldLocalJobPaths, true);
+ }
+ }
+ } catch (FileNotFoundException e) {
+ // ignore this FileNotFound exception and continue.
+ }
+ if (zkList == null) {
+ return;
+ }
+ ZooKeeperExt zooKeeperExt =
+ new ZooKeeperExt(zkList, 30*1000, this);
+ List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+ for (String rootChild : rootChildren) {
+ if (rootChild.startsWith("_hadoopBsp")) {
+ List<String> children =
+ zooKeeperExt.getChildren("/" + rootChild, false);
+ for (String child: children) {
+ if (child.contains("job_local_")) {
+ System.out.println("Cleaning up /_hadoopBsp/" +
+ child);
+ zooKeeperExt.deleteExt(
+ "/_hadoopBsp/" + child, -1, true);
+ }
+ }
+ }
+ }
+ zooKeeperExt.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // Do nothing
+ }
+
+ /**
+ * Helper method to remove an old output directory if it exists,
+ * and set the output path for any VertexOutputFormat that uses
+ * FileOutputFormat.
+ *
+ * @param job Job to set the output path for
+ * @param outputPathString Path to output as a string
+ * @throws IOException
+ */
+ public static void removeAndSetOutput(GiraphJob job,
+ Path outputPath) throws IOException {
+ remove(job.getConfiguration(), outputPath);
+ FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+ }
+
+ /**
+ * Helper method to remove a path if it exists.
+ *
+ * @param conf Configuration to load FileSystem from
+ * @param path Path to remove
+ * @throws IOException
+ */
+ public static void remove(Configuration conf, Path path)
+ throws IOException {
+ FileSystem hdfs = FileSystem.get(conf);
+ hdfs.delete(path, true);
+ }
+
+ public static String getCallingMethodName() {
+ return Thread.currentThread().getStackTrace()[2].getMethodName();
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/TestAccumuloVertexFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.accumulo;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.format.accumulo.edgemarker.AccumuloEdgeInputFormat;
+import org.apache.giraph.format.accumulo.edgemarker.AccumuloEdgeOutputFormat;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+
+/*
+ Test class for Accumulo vertex input/output formats.
+ */
+public class TestAccumuloVertexFormat extends BspCase{
+
+ private final String TABLE_NAME = "simple_graph";
+ private final String INSTANCE_NAME = "instance";
+ private final Text FAMILY = new Text("cf");
+ private final Text CHILDREN = new Text("children");
+ private final String USER = "root";
+ private final byte[] PASSWORD = new byte[] {};
+ private final Text OUTPUT_FIELD = new Text("parent");
+
+
+ private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public TestAccumuloVertexFormat(String testName) {
+ super(testName);
+ }
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(TestAccumuloVertexFormat.class);
+
+ }
+ /*
+ Write a simple parent-child directed graph to Accumulo.
+ Run a job which reads the values
+ into subclasses that extend AccumuloVertex I/O formats.
+ Check the output after the job.
+ */
+ public void testAccumuloInputOutput() throws Exception {
+ if (System.getProperty("prop.mapred.job.tracker") != null) {
+ if(log.isInfoEnabled())
+ log.info("testAccumuloInputOutput: " +
+ "Ignore this test if not local mode.");
+ return;
+ }
+
+ File jarTest = new File(System.getProperty("prop.jarLocation"));
+ if(!jarTest.exists()) {
+ fail("Could not find Giraph jar at " +
+ "location specified by 'prop.jarLocation'. " +
+ "Make sure you built the main Giraph artifact?.");
+ }
+
+ //Write out vertices and edges out to a mock instance.
+ MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+ Connector c = mockInstance.getConnector("root", new byte[] {});
+ c.tableOperations().create(TABLE_NAME);
+ BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
+
+ Mutation m1 = new Mutation(new Text("0001"));
+ m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
+ bw.addMutation(m1);
+
+ Mutation m2 = new Mutation(new Text("0002"));
+ m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
+ bw.addMutation(m2);
+ if(log.isInfoEnabled())
+ log.info("Writing mutations to Accumulo table");
+ bw.close();
+
+ Configuration conf = new Configuration();
+ conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+
+ /*
+ Very important to initialize the formats before
+ sending configuration to the GiraphJob. Otherwise
+ the internally constructed Job in GiraphJob will
+ not have the proper context initialization.
+ */
+ AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
+ TABLE_NAME, new Authorizations());
+ AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
+
+ AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
+ AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
+
+ GiraphJob job = new GiraphJob(conf, getCallingMethodName());
+ setupConfiguration(job);
+ job.setVertexClass(EdgeNotification.class);
+ job.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
+ job.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
+
+ HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
+ columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
+ AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
+
+ if(log.isInfoEnabled())
+ log.info("Running edge notification job using Accumulo input");
+ assertTrue(job.run(true));
+ Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
+ scanner.setRange(new Range("0002", "0002"));
+ scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
+ boolean foundColumn = false;
+
+ if(log.isInfoEnabled())
+ log.info("Verify job output persisted correctly.");
+ //make sure we found the qualifier.
+ assertTrue(scanner.iterator().hasNext());
+
+
+ //now we check to make sure the expected value from the job persisted correctly.
+ for(Map.Entry<Key,Value> entry : scanner) {
+ Text row = entry.getKey().getRow();
+ assertEquals("0002", row.toString());
+ Value value = entry.getValue();
+ assertEquals("0001", ByteBufferUtil.toString(
+ ByteBuffer.wrap(value.get())));
+ foundColumn = true;
+ }
+ }
+ /*
+ Test compute method that sends each edge a notification of its parents.
+ The test set only has a 1-1 parent-to-child ratio for this unit test.
+ */
+ public static class EdgeNotification
+ extends EdgeListVertex<Text, Text, Text, Text> {
+ @Override
+ public void compute(Iterator<Text> msgIterator) throws IOException {
+ while (msgIterator.hasNext()) {
+ getVertexValue().set(msgIterator.next());
+ }
+ if(getSuperstep() == 0) {
+ sendMsgToAllEdges(getVertexId());
+ }
+ voteToHalt();
+ }
+ }
+}
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo.edgemarker;
+
+import com.google.common.collect.Maps;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.format.accumulo.AccumuloVertexInputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/*
+ Example subclass which reads in Key/Value pairs to construct vertex objects.
+ */
+public class AccumuloEdgeInputFormat
+ extends AccumuloVertexInputFormat<Text, Text, Text, Text> {
+
+ private static final Text uselessEdgeValue = new Text();
+ private Configuration conf;
+ public VertexReader<Text, Text, Text, Text>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ try {
+
+ return new AccumuloEdgeVertexReader(
+ accumuloInputFormat.createRecordReader(split, context)) {
+ };
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ }
+ /*
+ Reader takes Key/Value pairs from the underlying input format.
+ */
+ public static class AccumuloEdgeVertexReader
+ extends AccumuloVertexReader<Text, Text, Text, Text> {
+
+ public static final Pattern commaPattern = Pattern.compile("[,]");
+
+ public AccumuloEdgeVertexReader(RecordReader<Key, Value> recordReader) {
+ super(recordReader);
+ }
+
+
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ /*
+ Each Key/Value contains the information needed to construct the vertices.
+ */
+ public BasicVertex<Text, Text, Text, Text> getCurrentVertex()
+ throws IOException, InterruptedException {
+ Key key = getRecordReader().getCurrentKey();
+ Value value = getRecordReader().getCurrentValue();
+ BasicVertex<Text, Text, Text,
+ Text> vertex = BspUtils.<Text, Text, Text,
+ Text>createVertex(getContext().getConfiguration());
+ Text vertexId = key.getRow();
+ Map<Text, Text> edges = Maps.newHashMap();
+ String edge = new String(value.get());
+ Text edgeId = new Text(edge);
+ edges.put(edgeId, uselessEdgeValue);
+ vertex.initialize(vertexId, new Text(), edges, null);
+
+ return vertex;
+ }
+ }
+}
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/AccumuloEdgeOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.accumulo.edgemarker;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.format.accumulo.AccumuloVertexOutputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/*
+ Example subclass for writing vertices back to Accumulo.
+ */
+public class AccumuloEdgeOutputFormat
+ extends AccumuloVertexOutputFormat<Text, Text, Text> {
+
+ public VertexWriter<Text, Text, Text>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Mutation> writer =
+ accumuloOutputFormat.getRecordWriter(context);
+ String tableName = context.getConfiguration().get(OUTPUT_TABLE);
+ if(tableName == null)
+ throw new IOException("Forgot to set table name " +
+ "using AccumuloVertexOutputFormat.OUTPUT_TABLE");
+ return new AccumuloEdgeVertexWriter(writer, tableName);
+ }
+
+ /*
+ Wraps RecordWriter for writing Mutations back to the configured Accumulo Table.
+ */
+ public static class AccumuloEdgeVertexWriter
+ extends AccumuloVertexWriter<Text, Text, Text> {
+
+ private final Text CF = new Text("cf");
+ private final Text PARENT = new Text("parent");
+ private Text tableName;
+
+ public AccumuloEdgeVertexWriter(
+ RecordWriter<Text, Mutation> writer, String tableName) {
+ super(writer);
+ this.tableName = new Text(tableName);
+ }
+ /*
+ Write back a mutation that adds a qualifier for 'parent' containing the vertex value
+ as the cell value. Assume the vertex ID corresponds to a key.
+ */
+ public void writeVertex
+ (BasicVertex<Text, Text, Text, ?> vertex)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Mutation> writer = getRecordWriter();
+ Mutation mt = new Mutation(vertex.getVertexId());
+ mt.put(CF, PARENT, new Value(
+ vertex.getVertexValue().toString().getBytes()));
+ writer.write(tableName, mt);
+ }
+ }
+}
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/TestHBaseRootMarkerVertextFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hbase;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.format.hbase.edgemarker.TableEdgeInputFormat;
+import org.apache.giraph.format.hbase.edgemarker.TableEdgeOutputFormat;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/*
+Test case for HBase reading/writing vertices from an HBase instance.
+*/
+public class TestHBaseRootMarkerVertextFormat extends BspCase {
+
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ private HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
+
+ private final String TABLE_NAME = "simple_graph";
+ private final String FAMILY = "cf";
+ private final String QUALIFER = "children";
+ private final String OUTPUT_FIELD = "parent";
+
+ public TestHBaseRootMarkerVertextFormat(String testName) {
+ super(testName);
+ }
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(TestHBaseRootMarkerVertextFormat.class);
+
+ }
+
+ public void testHBaseInputOutput() throws Exception{
+
+ if (System.getProperty("prop.mapred.job.tracker") != null) {
+ if(log.isInfoEnabled())
+ log.info("testHBaseInputOutput: Ignore this test if not local mode.");
+ return;
+ }
+
+ File jarTest = new File(System.getProperty("prop.jarLocation"));
+ if(!jarTest.exists()) {
+ fail("Could not find Giraph jar at " +
+ "location specified by 'prop.jarLocation'. " +
+ "Make sure you built the main Giraph artifact?.");
+ }
+
+ String INPUT_FILE = "graph.csv";
+ //First let's load some data using ImportTsv into our mock table.
+ String[] args = new String[] {
+ "-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
+ "-Dimporttsv.separator=" + "\u002c",
+ TABLE_NAME,
+ INPUT_FILE
+ };
+
+
+ MiniHBaseCluster cluster = testUtil.startMiniCluster();
+
+ GenericOptionsParser opts =
+ new GenericOptionsParser(cluster.getConfiguration(), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ try {
+
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+ String line1 = "0001,0002\n";
+ String line2 = "0002,0004\n";
+ String line3 = "0003,0005\n";
+ String line4 = "0004,-1\n";
+ String line5 = "0005,-1\n";
+ op.write(line1.getBytes());
+ op.write(line2.getBytes());
+ op.write(line3.getBytes());
+ op.write(line4.getBytes());
+ op.write(line5.getBytes());
+ op.close();
+
+ final byte[] FAM = Bytes.toBytes(FAMILY);
+ final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+
+ HTableDescriptor desc = new HTableDescriptor(TAB);
+ desc.addFamily(new HColumnDescriptor(FAM));
+ new HBaseAdmin(conf).createTable(desc);
+
+ Job job = ImportTsv.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+ if(log.isInfoEnabled())
+ log.info("ImportTsv successful. Running HBase Giraph job.");
+
+ //now operate over HBase using Vertex I/O formats
+ conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME);
+ conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
+
+ GiraphJob giraphJob = new GiraphJob(conf, getCallingMethodName());
+ giraphJob.setZooKeeperConfiguration(
+ cluster.getMaster().getZooKeeper().getQuorum());
+ setupConfiguration(giraphJob);
+ giraphJob.setVertexClass(EdgeNotification.class);
+ giraphJob.setVertexInputFormatClass(TableEdgeInputFormat.class);
+ giraphJob.setVertexOutputFormatClass(TableEdgeOutputFormat.class);
+
+ assertTrue(giraphJob.run(true));
+ if(log.isInfoEnabled())
+ log.info("Giraph job successful. Checking output qualifier.");
+
+ //Do a get on row 0002, it should have a parent of 0001
+ //if the outputFormat worked.
+ HTable table = new HTable(conf, TABLE_NAME);
+ Result result = table.get(new Get("0002".getBytes()));
+ byte[] parentBytes = result.getValue(FAMILY.getBytes(),
+ OUTPUT_FIELD.getBytes());
+ assertNotNull(parentBytes);
+ assertTrue(parentBytes.length > 0);
+ assertEquals("0001", Bytes.toString(parentBytes));
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+
+ /*
+ Test compute method that sends each edge a notification of its parents.
+ The test set only has a 1-1 parent-to-child ratio for this unit test.
+ */
+ public static class EdgeNotification
+ extends EdgeListVertex<Text, Text, Text, Text> {
+ @Override
+ public void compute(Iterator<Text> msgIterator) throws IOException {
+ while (msgIterator.hasNext()) {
+ getVertexValue().set(msgIterator.next());
+ }
+ if(getSuperstep() == 0) {
+ sendMsgToAllEdges(getVertexId());
+ }
+ voteToHalt();
+ }
+ }
+}
\ No newline at end of file
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeInputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase.edgemarker;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.format.hbase.HBaseVertexInputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+
+/*
+ Test subclass for HBaseVertexInputFormat. Reads a simple
+ children qualifier to create an edge.
+ */
+public class TableEdgeInputFormat extends
+ HBaseVertexInputFormat<Text, Text, Text, Text> {
+
+ private static final Logger log =
+ Logger.getLogger(TableEdgeInputFormat.class);
+ private static final Text uselessEdgeValue = new Text();
+ private Configuration conf;
+
+ public VertexReader<Text, Text, Text, Text>
+ createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+
+ return new TableEdgeVertexReader(
+ tableInputFormat.createRecordReader(split, context));
+
+ }
+
+ /*
+ Uses the RecordReader to return Hbase rows
+ */
+ public static class TableEdgeVertexReader
+ extends HBaseVertexReader<Text, Text, Text, Text> {
+
+ private final byte[] CF = Bytes.toBytes("cf");
+ private final byte[] CHILDREN = Bytes.toBytes("children");
+
+ public TableEdgeVertexReader(
+ RecordReader<ImmutableBytesWritable, Result> recordReader) {
+ super(recordReader);
+ }
+
+
+ public boolean nextVertex() throws IOException,
+ InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ /*
+ For each row, create a vertex with the row ID as a text,
+ and it's 'children' qualifier as a single edge.
+ */
+ public BasicVertex<Text, Text, Text, Text>
+ getCurrentVertex()
+ throws IOException, InterruptedException {
+ Result row = getRecordReader().getCurrentValue();
+ BasicVertex<Text, Text, Text, Text> vertex =
+ BspUtils.<Text, Text, Text, Text>
+ createVertex(getContext().getConfiguration());
+ Text vertexId = new Text(Bytes.toString(row.getRow()));
+ Map<Text, Text> edges = Maps.newHashMap();
+ String edge = Bytes.toString(row.getValue(CF, CHILDREN));
+ Text vertexValue = new Text();
+ Text edgeId = new Text(edge);
+ edges.put(edgeId, uselessEdgeValue);
+ vertex.initialize(vertexId, vertexValue, edges, null);
+
+ return vertex;
+ }
+ }
+}
Added: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java?rev=1344020&view=auto
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java (added)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/format/hbase/edgemarker/TableEdgeOutputFormat.java Tue May 29 21:56:31 2012
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.format.hbase.edgemarker;
+
+import org.apache.giraph.format.hbase.HBaseVertexOutputFormat;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+/*
+ Test subclass for HBaseVertexOutputFormat
+ */
+public class TableEdgeOutputFormat
+ extends HBaseVertexOutputFormat<Text, Text, Text> {
+
+
+ public VertexWriter<Text, Text, Text>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<ImmutableBytesWritable, Writable> writer =
+ tableOutputFormat.getRecordWriter(context);
+ return new TableEdgeVertexWriter(writer);
+ }
+
+ /*
+ For each vertex, write back to the configured table using
+ the vertex id as the row key bytes.
+ */
+ public static class TableEdgeVertexWriter
+ extends HBaseVertexWriter<Text, Text, Text> {
+
+ private final byte[] CF = Bytes.toBytes("cf");
+ private final byte[] PARENT = Bytes.toBytes("parent");
+
+ public TableEdgeVertexWriter(
+ RecordWriter<ImmutableBytesWritable, Writable> writer) {
+ super(writer);
+ }
+ /*
+ Record the vertex value as a the value for a new qualifier 'parent'.
+ */
+ public void writeVertex(
+ BasicVertex<Text, Text, Text, ?> vertex)
+ throws IOException, InterruptedException {
+ RecordWriter<ImmutableBytesWritable, Writable> writer = getRecordWriter();
+ byte[] rowBytes = vertex.getVertexId().getBytes();
+ Put put = new Put(rowBytes);
+ Text value = vertex.getVertexValue();
+ if(value.toString().length() > 0) {
+ put.add(CF, PARENT, value.getBytes());
+ writer.write(new ImmutableBytesWritable(rowBytes), put);
+ }
+ }
+ }
+}