You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2017/01/23 16:21:17 UTC

[3/3] incubator-rya git commit: initial giraph support; closes #132

initial giraph support; closes #132


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/45efa55b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/45efa55b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/45efa55b

Branch: refs/heads/master
Commit: 45efa55b01e58343d33522937c964d7fe6acfbe9
Parents: ccb5a67
Author: pujav65 <pu...@gmail.com>
Authored: Tue Dec 20 16:35:06 2016 -0500
Committer: pujav65 <pu...@gmail.com>
Committed: Mon Jan 23 11:20:35 2017 -0500

----------------------------------------------------------------------
 .../apache/rya/sail/config/RyaSailFactory.java  |   2 +-
 extras/pom.xml                                  |   3 +-
 extras/rya.giraph/pom.xml                       |  85 ++++++++++++
 .../rya/giraph/format/RyaEdgeInputFormat.java   |  61 +++++++++
 .../apache/rya/giraph/format/RyaEdgeReader.java |  84 ++++++++++++
 .../rya/giraph/format/RyaGiraphUtils.java       |  78 +++++++++++
 .../rya/giraph/format/RyaVertexInputFormat.java |  75 +++++++++++
 .../rya/giraph/format/RyaVertexReader.java      |  94 +++++++++++++
 .../rya/giraph/format/TestTextOutputFormat.java |  44 +++++++
 .../rya/giraph/format/TestVertexFormat.java     | 132 +++++++++++++++++++
 .../apache/rya/accumulo/mr/RyaInputFormat.java  |  14 ++
 11 files changed, 670 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
index 60ab615..8ce6694 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
@@ -133,7 +133,7 @@ public class RyaSailFactory {
         return dao;
     }
 
-    private static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+    public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
         final Connector connector = ConfigUtils.getConnector(config);
         final AccumuloRyaDAO dao = new AccumuloRyaDAO();
         dao.setConnector(connector);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 2ac129a..6acb51f 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -42,7 +42,8 @@ under the License.
         <module>rya.pcj.fluo</module>
         <module>rya.export</module>
         <module>rya.merger</module>
-        <module>rya.benchmark</module>
+        <module>rya.giraph</module>
+       <module>rya.benchmark</module>
     </modules>
 	
 	<profiles>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/pom.xml b/extras/rya.giraph/pom.xml
new file mode 100644
index 0000000..2615b34
--- /dev/null
+++ b/extras/rya.giraph/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rya.giraph</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.giraph</groupId>
+            <artifactId>giraph-core</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.giraph</groupId>
+            <artifactId>giraph-accumulo</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+<dependency>
+           <groupId>org.apache.hadoop</groupId>
+           <artifactId>hadoop-mapreduce-client-common</artifactId>
+            <version>2.5.1</version>
+          <scope>provided</scope>
+         </dependency>
+         <dependency>
+           <groupId>org.apache.hadoop</groupId>
+           <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>2.5.1</version>
+          <scope>provided</scope>
+</dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>accumulo.rya</artifactId>
+            <exclusions>
+             	<exclusion>
+            		<artifactId>
+            			hadoop-mapreduce-client-jobclient
+            		</artifactId>
+            		<groupId>org.apache.hadoop</groupId>
+            	</exclusion>
+             	<exclusion>
+             		<artifactId>
+             			hadoop-mapreduce-client-core
+             		</artifactId>
+             		<groupId>org.apache.hadoop</groupId>
+             	</exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.mapreduce</artifactId>
+            <exclusions>
+            	<exclusion>
+            		<artifactId>spark-graphx_2.11</artifactId>
+            		<groupId>org.apache.spark</groupId>
+            	</exclusion>
+            	
+            </exclusions>
+        </dependency>
+   </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
new file mode 100644
index 0000000..761e409
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java
@@ -0,0 +1,61 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class RyaEdgeInputFormat extends EdgeInputFormat<Text, RyaStatementWritable> {
+
+    private RyaInputFormat ryaInputFormat = new RyaInputFormat();
+    private TABLE_LAYOUT rdfTableLayout;
+    private RyaTripleContext tripleContext;
+    
+    @Override
+    public EdgeReader<Text, RyaStatementWritable> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException {
+        return new RyaEdgeReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context),
+                rdfTableLayout, tripleContext,
+                context.getConfiguration());
+    }
+
+    @Override
+    public void checkInputSpecs(Configuration arg0) {
+        // nothing to do
+        
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int arg1) throws IOException, InterruptedException {
+       return ryaInputFormat.getSplits(context);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
new file mode 100644
index 0000000..f0f9017
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java
@@ -0,0 +1,84 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaEdgeReader extends EdgeReader<Text, RyaStatementWritable>{
+    
+    private RyaStatementRecordReader reader;
+    private RyaTripleContext tripleContext;
+    private TABLE_LAYOUT tableLayout;
+    
+    public RyaEdgeReader(RyaStatementRecordReader recordReader,
+            TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){
+        this.reader = recordReader;
+        this.tableLayout = rdfTableLayout;
+        this.tripleContext = tripleContext;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+       reader.initialize(inputSplit, context, tripleContext, tableLayout);       
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+       return reader.getProgress();
+    }
+
+    @Override
+    public boolean nextEdge() throws IOException, InterruptedException {
+        return reader.nextKeyValue();
+    }
+
+    @Override
+    public Text getCurrentSourceId() throws IOException, InterruptedException {
+        RyaStatementWritable currentStatement = reader.getCurrentValue();
+        return new Text(currentStatement.getRyaStatement().getSubject().getData());
+    }
+
+    @Override
+    public Edge<Text, RyaStatementWritable> getCurrentEdge() throws IOException, InterruptedException {
+        RyaStatementWritable currentStatement = reader.getCurrentValue();
+        RyaStatement ryaStatement = currentStatement.getRyaStatement();
+        Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()),
+               currentStatement);
+        return edge;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
new file mode 100644
index 0000000..37b44d7
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java
@@ -0,0 +1,78 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+
+public class RyaGiraphUtils {
+
+    public static void initializeAccumuloInputFormat(Configuration conf){
+        // get accumulo connect information
+        boolean mock = MRUtils.getACMock(conf, false);
+        String zk = MRUtils.getACZK(conf);
+        String instance = MRUtils.getACInstance(conf);
+        String userName = MRUtils.getACUserName(conf);
+        String pwd = MRUtils.getACPwd(conf);
+        String tablePrefix = MRUtils.getTablePrefix(conf);
+        TABLE_LAYOUT rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO);
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        Authorizations authorizations;
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        
+        
+        // set up the accumulo input format so that we know what table to use and everything
+        try {
+            Job job = new Job(conf);
+            AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+            String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix);
+            AccumuloInputFormat.setInputTableName(job, tableName);
+            AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+            if (mock) {
+                AccumuloInputFormat.setMockInstance(job, instance);
+            } else {
+                ClientConfiguration clientConfig = ClientConfiguration.loadDefault()
+                        .withInstance(instance).withZkHosts(zk);
+                AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
+            }
+        } catch (IOException | AccumuloSecurityException e) {
+            // TODO better exception handling here
+            e.printStackTrace();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
new file mode 100644
index 0000000..ecd2c13
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java
@@ -0,0 +1,75 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaVertexInputFormat extends AccumuloVertexInputFormat<Text, RyaTypeWritable, RyaStatementWritable> {
+
+
+    private RyaInputFormat ryaInputFormat = new RyaInputFormat();
+    private TABLE_LAYOUT rdfTableLayout;
+    private RyaTripleContext tripleContext;
+    
+    @Override
+    public VertexReader<Text, RyaTypeWritable, RyaStatementWritable> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+         return new RyaVertexReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context),
+                rdfTableLayout, tripleContext,
+                context.getConfiguration());
+    }
+
+    @Override
+    public void checkInputSpecs(Configuration conf) {
+        // don't need to do anything here
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException {
+        return ryaInputFormat.getSplits(context);
+    }
+
+    @Override
+    public void setConf(ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> conf) {
+        super.setConf(conf);
+        RyaGiraphUtils.initializeAccumuloInputFormat(conf);
+        rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO);
+        tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
new file mode 100644
index 0000000..dbb404d
--- /dev/null
+++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java
@@ -0,0 +1,94 @@
+package org.apache.rya.giraph.format;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.accumulo.mr.RyaStatementWritable;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaTripleContext;
+
+public class RyaVertexReader extends VertexReader<Text, RyaTypeWritable, RyaStatementWritable>{
+    
+    private RyaStatementRecordReader reader;
+    private RyaTripleContext tripleContext;
+    private TABLE_LAYOUT tableLayout;
+    private ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> classesConfiguration;
+    
+    public RyaVertexReader(RyaStatementRecordReader recordReader,
+            TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){
+        this.reader = recordReader;
+        this.tableLayout = rdfTableLayout;
+        this.tripleContext = tripleContext;
+        this.classesConfiguration = 
+                new ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable>(conf);
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+       reader.initialize(inputSplit, context, tripleContext, tableLayout);       
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return reader.nextKeyValue();
+    }
+
+    @Override
+    public Vertex<Text, RyaTypeWritable, RyaStatementWritable> getCurrentVertex() throws IOException, InterruptedException {
+        RyaStatementWritable currentStatement = reader.getCurrentValue();
+        RyaStatement ryaStatement = currentStatement.getRyaStatement();
+        RyaTypeWritable vertexWritable = new RyaTypeWritable();
+        vertexWritable.setRyaType(ryaStatement.getSubject());
+       Text vertexId = new Text(ryaStatement.getSubject().getData()); 
+       Vertex<Text, RyaTypeWritable, RyaStatementWritable> vertex = classesConfiguration.createVertex();
+       Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()),
+               currentStatement);
+       List<Edge<Text, RyaStatementWritable>> edges = new ArrayList<Edge<Text, RyaStatementWritable>>();
+       edges.add(edge);
+        vertex.initialize(vertexId, vertexWritable, edges);
+        return vertex;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+       return reader.getProgress();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
new file mode 100644
index 0000000..be297db
--- /dev/null
+++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java
@@ -0,0 +1,44 @@
+package org.apache.rya.giraph.format;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestTextOutputFormat extends TextVertexOutputFormat<Text, Text, Text> {
+
+    public class SystemOutVertexWriter extends TextVertexWriter {
+
+        @Override
+        public void writeVertex(Vertex<Text, Text, Text> vertex) throws IOException, InterruptedException {
+           System.out.println(vertex);
+        }
+
+    }
+
+    @Override
+    public TextVertexOutputFormat<Text, Text, Text>.TextVertexWriter createVertexWriter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SystemOutVertexWriter();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
----------------------------------------------------------------------
diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
new file mode 100644
index 0000000..920e876
--- /dev/null
+++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rya.giraph.format;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.giraph.format.RyaVertexInputFormat;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.Test;
+
+/*
+    Test class for Rya vertex input formats.
+ */
+public class TestVertexFormat extends BspCase {
+
+    private final Logger log = Logger.getLogger(TestVertexFormat.class);
+
+    /**
+     * Create the test case
+     */
+    public TestVertexFormat() {
+        super(TestVertexFormat.class.getName());
+        System.setProperty("java.io.tmpdir", "target/test");
+}
+      
+    private static AccumuloRdfConfiguration getConf() {
+
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+        conf.set(ConfigUtils.USE_PCJ, "false");
+        conf.set(ConfigUtils.USE_FREETEXT, "false");
+        conf.set(ConfigUtils.USE_TEMPORAL, "false");
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_");
+        conf.set(ConfigUtils.CLOUDBASE_USER, "root");
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "");
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "test");
+        conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+        return conf;
+    }
+
+     /*
+    Write a simple parent-child directed graph to Cloudbase.
+    Run a job which reads the values
+    into subclasses that extend CloudbaseVertex I/O formats.
+    Check the output after the job.
+    */
+    @Test
+    public void testRyaInput() throws Exception {
+
+        AccumuloRdfConfiguration conf = getConf();
+        AccumuloRyaDAO ryaDAO = RyaSailFactory.getAccumuloDAO(conf);
+
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred1"),
+                new RyaURI("urn:test#obj1")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred2"),
+                new RyaURI("urn:test#obj2")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred3"),
+                new RyaURI("urn:test#obj3")));
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"),
+                new RyaURI("urn:test#pred4"),
+                new RyaURI("urn:test#obj4")));
+        ryaDAO.flush();
+
+        GiraphJob job = new GiraphJob(conf, getCallingMethodName());
+
+        setupConfiguration(job);
+        GiraphConfiguration giraphConf = job.getConfiguration();
+        giraphConf.setComputationClass(EdgeNotification.class);
+        giraphConf.setVertexInputFormatClass(RyaVertexInputFormat.class);
+        giraphConf.setVertexOutputFormatClass(TestTextOutputFormat.class);
+
+
+        if (log.isInfoEnabled())
+            log.info("Running edge notification job using Rya Vertex input");
+
+    }
+    
+    /*
+    Test compute method that sends each edge a notification of its parents.
+    The test set only has a 1-1 parent-to-child ratio for this unit test.
+     */
+    public static class EdgeNotification
+            extends BasicComputation<Text, Text, Text, Text> {
+      @Override
+      public void compute(Vertex<Text, Text, Text> vertex,
+          Iterable<Text> messages) throws IOException {
+          for (Text message : messages) {
+            vertex.getValue().set(message);
+          }
+          if(getSuperstep() == 0) {
+            sendMessageToAllEdges(vertex, vertex.getId());
+          }
+        vertex.voteToHalt();
+      }
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
index baa7033..2fc2728 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -95,6 +96,19 @@ public class RyaInputFormat extends AbstractInputFormat<Text, RyaStatementWritab
         }
 
         /**
+         * Initializes the RecordReader.
+         * @param   inSplit Defines the portion of data to read.
+         * @param   attempt Context for this task attempt.
+         * @throws IOException if thrown by the superclass's initialize method.
+         */
+        public void initialize(InputSplit inSplit, TaskAttemptContext attempt, RyaTripleContext context, TABLE_LAYOUT tableLayout) throws IOException {
+            super.initialize(inSplit, attempt);
+            this.tableLayout = tableLayout;
+            //TODO verify that this is correct
+            this.ryaContext = context;
+        }
+
+        /**
          * Load the next statement by converting the next Accumulo row to a
          * statement, and make the new (key,value) pair available for retrieval.
          * @return true if another (key,value) pair was fetched and is ready to