You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/12/20 21:37:51 UTC

[2/3] incubator-rya git commit: Bugfixes and updates for Spark support

Bugfixes and updates for Spark support


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3f27536a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3f27536a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3f27536a

Branch: refs/heads/master
Commit: 3f27536a38af4983f697ecd0540e9a82185b3262
Parents: 77ff31e
Author: Jesse Hatfield <je...@parsons.com>
Authored: Thu Sep 8 16:40:55 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Dec 20 10:47:20 2016 -0500

----------------------------------------------------------------------
 .../rya/indexing/accumulo/ConfigUtils.java      |   1 -
 .../accumulo/entity/EntityCentricIndex.java     |  23 +-
 mapreduce/pom.xml                               |  20 +-
 .../rya/accumulo/mr/GraphXEdgeInputFormat.java  | 209 ------------------
 .../mvm/rya/accumulo/mr/GraphXInputFormat.java  | 132 ------------
 .../mvm/rya/accumulo/mr/RyaTypeWritable.java    |  74 -------
 .../rya/accumulo/mr/GraphXEdgeInputFormat.java  | 216 +++++++++++++++++++
 .../rya/accumulo/mr/GraphXInputFormat.java      | 147 +++++++++++++
 .../apache/rya/accumulo/mr/RyaTypeWritable.java | 123 +++++++++++
 .../accumulo/mr/GraphXEdgeInputFormatTest.java  | 134 ------------
 .../rya/accumulo/mr/GraphXInputFormatTest.java  | 144 -------------
 .../accumulo/mr/GraphXEdgeInputFormatTest.java  | 134 ++++++++++++
 .../rya/accumulo/mr/GraphXInputFormatTest.java  | 142 ++++++++++++
 pom.xml                                         |   1 +
 spark/pom.xml                                   |   6 +-
 .../accumulo/spark/GraphXGraphGenerator.java    | 188 ----------------
 .../accumulo/spark/GraphXGraphGenerator.java    | 183 ++++++++++++++++
 17 files changed, 978 insertions(+), 899 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index e9e6c31..61a1003 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -365,7 +365,6 @@ public class ConfigUtils {
 
     public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
 
-    	System.out.println("Testuing");
         final List<String> indexList = Lists.newArrayList();
         final List<String> optimizers = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
index d58b1f1..0676e3d 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
@@ -24,6 +24,7 @@ import static org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTES;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -281,24 +282,26 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
         final byte[] columnFamily = Arrays.copyOf(data, split);
         final byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
         split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
-        final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
-        final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + DELIM_BYTES.length, edgeBytes.length - 2);
-        final byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  edgeBytes.length - 2, edgeBytes.length);
+        String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
+        byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + DELIM_BYTES.length, edgeBytes.length);
+        split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+        byte[] otherNodeData = Arrays.copyOf(otherNodeBytes,  split);
+        byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes,  split, otherNodeBytes.length);
         byte[] objectBytes;
         RyaURI subject;
         final RyaURI predicate = new RyaURI(new String(predicateBytes));
         RyaType object;
         RyaURI context = null;
-        // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker}
-        //            or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker}
+        // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype}
+        //            or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype}
         switch (otherNodeVar) {
             case "subject":
-                subject = new RyaURI(new String(otherNodeBytes));
+                subject = new RyaURI(new String(otherNodeData));
                 objectBytes = Bytes.concat(entityBytes, typeBytes);
                 break;
             case "object":
                 subject = new RyaURI(new String(entityBytes));
-                objectBytes = Bytes.concat(otherNodeBytes, typeBytes);
+                objectBytes = Bytes.concat(otherNodeData, typeBytes);
                 break;
             default:
                 throw new IOException("Failed to deserialize entity-centric index row. "
@@ -311,7 +314,7 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
         return new RyaStatement(subject, predicate, object, context,
                 null, columnVisibility, valueBytes, timestamp);
     }
-    
+
     /**
      * Return the RyaType of the Entity Centric Index row.
      * @param key Row key, contains statement data
@@ -332,7 +335,9 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer {
         byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
         split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
         String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
-        byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  edgeBytes.length - 2, edgeBytes.length);
+        byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + DELIM_BYTES.length, edgeBytes.length);
+        split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+        byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes,  split, otherNodeBytes.length);
         byte[] objectBytes;
         RyaURI subject;
         RyaType object;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml
index d222845..c4f94f6 100644
--- a/mapreduce/pom.xml
+++ b/mapreduce/pom.xml
@@ -31,11 +31,11 @@ under the License.
     <name>Apache Rya MapReduce Tools</name>
 
     <dependencies>
-    	<dependency>
-		    <groupId>org.apache.spark</groupId>
-		    <artifactId>spark-graphx_2.11</artifactId>
-		    <version>1.6.2</version>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-graphx_2.11</artifactId>
+            <version>1.6.2</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.api</artifactId>
@@ -116,6 +116,16 @@ under the License.
                         <executions>
                             <execution>
                                 <configuration>
+                                    <filters>
+                                        <filter>
+                                            <artifact>*:*</artifact>
+                                            <excludes>
+                                                <exclude>META-INF/*.SF</exclude>
+                                                <exclude>META-INF/*.DSA</exclude>
+                                                <exclude>META-INF/*.RSA</exclude>
+                                            </excludes>
+                                        </filter>
+                                    </filters>
                                     <transformers>
                                         <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                     </transformers>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
deleted file mode 100644
index 79d6e82..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
+++ /dev/null
@@ -1,209 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-import java.util.Map.Entry;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.spark.graphx.Edge;
-
-/**
- * Subclass of {@link AbstractInputFormat} for reading
- * {@link RyaStatementWritable}s directly from a running Rya instance.
- */
-@SuppressWarnings("rawtypes")
-public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
-	/**
-	 * Instantiates a RecordReader for this InputFormat and a given task and
-	 * input split.
-	 * 
-	 * @param split
-	 *            Defines the portion of the input this RecordReader is
-	 *            responsible for.
-	 * @param context
-	 *            The context of the task.
-	 * @return A RecordReader that can be used to fetch RyaStatementWritables.
-	 */
-	@Override
-	public RecordReader<Object, Edge> createRecordReader(InputSplit split,
-			TaskAttemptContext context) {
-		return new RyaStatementRecordReader();
-	}
-
-	/**
-	 * Sets the table layout to use.
-	 * 
-	 * @param conf
-	 *            Configuration to set the layout in.
-	 * @param layout
-	 *            Statements will be read from the Rya table associated with
-	 *            this layout.
-	 */
-	public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
-		conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
-	}
-
-	/**
-	 * Retrieves RyaStatementWritable objects from Accumulo tables.
-	 */
-	public class RyaStatementRecordReader extends
-			AbstractRecordReader<Object, Edge> {
-		private RyaTripleContext ryaContext;
-		private TABLE_LAYOUT tableLayout;
-
-		protected void setupIterators(TaskAttemptContext context,
-				Scanner scanner, String tableName, RangeInputSplit split) {
-		}
-
-		/**
-		 * Initializes the RecordReader.
-		 * 
-		 * @param inSplit
-		 *            Defines the portion of data to read.
-		 * @param attempt
-		 *            Context for this task attempt.
-		 * @throws IOException
-		 *             if thrown by the superclass's initialize method.
-		 */
-		@Override
-		public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
-				throws IOException {
-			super.initialize(inSplit, attempt);
-			this.tableLayout = MRUtils.getTableLayout(
-					attempt.getConfiguration(), TABLE_LAYOUT.SPO);
-			// TODO verify that this is correct
-			this.ryaContext = RyaTripleContext
-					.getInstance(new AccumuloRdfConfiguration(attempt
-							.getConfiguration()));
-		}
-
-		/**
-		 * Load the next statement by converting the next Accumulo row to a
-		 * statement, and make the new (key,value) pair available for retrieval.
-		 * 
-		 * @return true if another (key,value) pair was fetched and is ready to
-		 *         be retrieved, false if there was none.
-		 * @throws IOException
-		 *             if a row was loaded but could not be converted to a
-		 *             statement.
-		 */
-		@Override
-		public boolean nextKeyValue() throws IOException {
-			if (!scannerIterator.hasNext())
-				return false;
-			Entry<Key, Value> entry = scannerIterator.next();
-			++numKeysRead;
-			currentKey = entry.getKey();
-			try {
-				currentK = currentKey.getRow();
-				RyaTypeWritable rtw = null;
-				RyaStatement stmt = this.ryaContext.deserializeTriple(
-						this.tableLayout, new TripleRow(entry.getKey().getRow()
-								.getBytes(), entry.getKey().getColumnFamily()
-								.getBytes(), entry.getKey()
-								.getColumnQualifier().getBytes(), entry
-								.getKey().getTimestamp(), entry.getKey()
-								.getColumnVisibility().getBytes(), entry
-								.getValue().get()));
-
-				String subjURI = stmt.getSubject().getDataType().toString();
-				String objURI = stmt.getObject().getDataType().toString();
-
-				// SHA-256 the string value and then generate a hashcode from
-				// the digested string, the collision ratio is less than 0.0001%
-				// using custom hash function should significantly reduce the
-				// collision ratio
-				MessageDigest messageDigest = MessageDigest
-						.getInstance("SHA-256");
-
-				messageDigest.update(subjURI.getBytes());
-				String encryptedString = new String(messageDigest.digest());
-				long subHash = hash(encryptedString);
-
-				messageDigest.update(objURI.getBytes());
-				encryptedString = new String(messageDigest.digest());
-				long objHash = hash(encryptedString);
-
-				Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
-						subHash, objHash, rtw);
-				currentV = writable;
-			} catch (TripleRowResolverException | NoSuchAlgorithmException e) {
-				throw new IOException(e);
-			}
-			return true;
-		}
-
-		protected List<IteratorSetting> contextIterators(
-				TaskAttemptContext context, String tableName) {
-			return getIterators(context);
-		}
-
-		@Override
-		protected void setupIterators(TaskAttemptContext context,
-				Scanner scanner, String tableName,
-				org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-			List<IteratorSetting> iterators = null;
-
-			if (null == split) {
-				iterators = contextIterators(context, tableName);
-			} else {
-				iterators = split.getIterators();
-				if (null == iterators) {
-					iterators = contextIterators(context, tableName);
-				}
-			}
-
-			for (IteratorSetting iterator : iterators)
-				scanner.addScanIterator(iterator);
-		}
-
-	}
-
-	public static long hash(String string) {
-		long h = 1125899906842597L; // prime
-		int len = string.length();
-
-		for (int i = 0; i < len; i++) {
-			h = 31 * h + string.charAt(i);
-		}
-		return h;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
deleted file mode 100644
index 6ec5c74..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> {
-
-	private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
-
-	/**
-	 * Instantiates a RecordReader for this InputFormat and a given task and
-	 * input split.
-	 * 
-	 * @param split
-	 *            Defines the portion of the input this RecordReader is
-	 *            responsible for.
-	 * @param context
-	 *            The context of the task.
-	 * @return A RecordReader that can be used to fetch RyaStatementWritables.
-	 */
-	@Override
-	public RecordReader<Object, RyaTypeWritable> createRecordReader(
-			InputSplit split, TaskAttemptContext context) {
-		return new RyaStatementRecordReader();
-	}
-
-	
-	
-	/**
-	 * Retrieves RyaStatementWritable objects from Accumulo tables.
-	 */
-	public class RyaStatementRecordReader extends
-			AbstractRecordReader<Object, RyaTypeWritable> {
-		protected void setupIterators(TaskAttemptContext context,
-				Scanner scanner, String tableName,
-				@SuppressWarnings("deprecation") RangeInputSplit split) {
-			IteratorSetting iteratorSetting = new IteratorSetting(
-					WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class);
-			scanner.addScanIterator(iteratorSetting);
-		}
-
-		/**
-		 * Initializes the RecordReader.
-		 * 
-		 * @param inSplit
-		 *            Defines the portion of data to read.
-		 * @param attempt
-		 *            Context for this task attempt.
-		 * @throws IOException
-		 *             if thrown by the superclass's initialize method.
-		 */
-		@Override
-		public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
-				throws IOException {
-			super.initialize(inSplit, attempt);
-		}
-
-		/**
-		 * Load the next statement by converting the next Accumulo row to a
-		 * statement, and make the new (key,value) pair available for retrieval.
-		 * 
-		 * @return true if another (key,value) pair was fetched and is ready to
-		 *         be retrieved, false if there was none.
-		 * @throws IOException
-		 *             if a row was loaded but could not be converted to a
-		 *             statement.
-		 */
-		@Override
-		public boolean nextKeyValue() throws IOException {
-			if (!scannerIterator.hasNext())
-				return false;
-			Entry<Key, Value> entry = scannerIterator.next();
-			++numKeysRead;
-			currentKey = entry.getKey();
-
-			try {
-				currentK = currentKey.getRow();
-				SortedMap<Key, Value> wholeRow = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
-				Key key = wholeRow.firstKey();
-				Value value = wholeRow.get(key);
-				RyaType type = EntityCentricIndex.getRyaType(key, value);
-				RyaTypeWritable writable = new RyaTypeWritable();
-				writable.setRyaType(type);
-				currentV = writable;
-			} catch (RyaTypeResolverException e) {
-				throw new IOException();
-			}
-			return true;
-		}
-
-		protected List<IteratorSetting> contextIterators(
-				TaskAttemptContext context, String tableName) {
-			return getIterators(context);
-		}
-
-		@Override
-		protected void setupIterators(TaskAttemptContext context,
-				Scanner scanner, String tableName,
-				org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-
-			List<IteratorSetting> iterators = null;
-
-			if (null == split) {
-				iterators = contextIterators(context, tableName);
-			} else {
-				iterators = split.getIterators();
-				if (null == iterators) {
-					iterators = contextIterators(context, tableName);
-				}
-			}
-
-			for (IteratorSetting iterator : iterators)
-				scanner.addScanIterator(iterator);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
deleted file mode 100644
index ddc0948..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
-
-	private RyaType ryatype;
-	
-	/**
-     * Read part of a statement from an input stream.
-     * @param dataInput Stream for reading serialized statements.
-     * @return The next individual field, as a byte array.
-     * @throws IOException if reading from the stream fails.
-     */
-    protected byte[] read(DataInput dataInput) throws IOException {
-        if (dataInput.readBoolean()) {
-            int len = dataInput.readInt();
-            byte[] bytes = new byte[len];
-            dataInput.readFully(bytes);
-            return bytes;
-        }else {
-            return null;
-        }
-    }
-	
-	@Override
-	public void readFields(DataInput dataInput) throws IOException {
-		ValueFactoryImpl vfi = new ValueFactoryImpl();
-		String data = dataInput.readLine();
-		String dataTypeString = dataInput.readLine();
-		URI dataType = vfi.createURI(dataTypeString);
-		ryatype.setData(data);
-		ryatype.setDataType(dataType);
-	}
-
-	@Override
-	public void write(DataOutput dataOutput) throws IOException {
-		dataOutput.writeChars(ryatype.getData());
-		dataOutput.writeChars(ryatype.getDataType().toString());
-	}
-	
-	/**
-     * Gets the contained RyaStatement.
-     * @return The statement represented by this RyaStatementWritable.
-     */
-    public RyaType getRyaType() {
-        return ryatype;
-    }
-    /**
-     * Sets the contained RyaStatement.
-     * @param   ryaStatement    The statement to be represented by this
-     *                          RyaStatementWritable.
-     */
-    public void setRyaType(RyaType ryatype) {
-        this.ryatype = ryatype;
-    }
-
-	@Override
-	public int compareTo(RyaTypeWritable o) {
-		return ryatype.compareTo(o.ryatype);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
new file mode 100644
index 0000000..489fd34
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
@@ -0,0 +1,216 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.graphx.Edge;
+
+/**
+ * Subclass of {@link AbstractInputFormat} for reading
+ * {@link RyaStatementWritable}s directly from a running Rya instance.
+ */
+@SuppressWarnings("rawtypes")
+public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
+	/**
+	 * Instantiates a RecordReader for this InputFormat and a given task and
+	 * input split.
+	 *
+	 * @param split
+	 *            Defines the portion of the input this RecordReader is
+	 *            responsible for.
+	 * @param context
+	 *            The context of the task.
+	 * @return A RecordReader that can be used to fetch RyaStatementWritables.
+	 */
+	@Override
+	public RecordReader<Object, Edge> createRecordReader(InputSplit split,
+			TaskAttemptContext context) {
+		return new RyaStatementRecordReader();
+	}
+
+	/**
+	 * Sets the table layout to use.
+	 *
+	 * @param conf
+	 *            Configuration to set the layout in.
+	 * @param layout
+	 *            Statements will be read from the Rya table associated with
+	 *            this layout.
+	 */
+	public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+		conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
+	}
+
+	/**
+	 * Retrieves RyaStatementWritable objects from Accumulo tables.
+	 */
+	public class RyaStatementRecordReader extends
+			AbstractRecordReader<Object, Edge> {
+		private RyaTripleContext ryaContext;
+		private TABLE_LAYOUT tableLayout;
+
+		protected void setupIterators(TaskAttemptContext context,
+				Scanner scanner, String tableName, RangeInputSplit split) {
+		}
+
+		/**
+		 * Initializes the RecordReader.
+		 *
+		 * @param inSplit
+		 *            Defines the portion of data to read.
+		 * @param attempt
+		 *            Context for this task attempt.
+		 * @throws IOException
+		 *             if thrown by the superclass's initialize method.
+		 */
+		@Override
+		public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
+				throws IOException {
+			super.initialize(inSplit, attempt);
+			this.tableLayout = MRUtils.getTableLayout(
+					attempt.getConfiguration(), TABLE_LAYOUT.SPO);
+			// TODO verify that this is correct
+			this.ryaContext = RyaTripleContext
+					.getInstance(new AccumuloRdfConfiguration(attempt
+							.getConfiguration()));
+		}
+
+		/**
+		 * Load the next statement by converting the next Accumulo row to a
+		 * statement, and make the new (key,value) pair available for retrieval.
+		 *
+		 * @return true if another (key,value) pair was fetched and is ready to
+		 *         be retrieved, false if there was none.
+		 * @throws IOException
+		 *             if a row was loaded but could not be converted to a
+		 *             statement.
+		 */
+		@Override
+		public boolean nextKeyValue() throws IOException {
+			if (!scannerIterator.hasNext())
+				return false;
+			Entry<Key, Value> entry = scannerIterator.next();
+			++numKeysRead;
+			currentKey = entry.getKey();
+			try {
+				currentK = currentKey.getRow();
+				RyaTypeWritable rtw = new RyaTypeWritable();
+				RyaStatement stmt = this.ryaContext.deserializeTriple(
+						this.tableLayout, new TripleRow(entry.getKey().getRow()
+								.getBytes(), entry.getKey().getColumnFamily()
+								.getBytes(), entry.getKey()
+								.getColumnQualifier().getBytes(), entry
+								.getKey().getTimestamp(), entry.getKey()
+								.getColumnVisibility().getBytes(), entry
+								.getValue().get()));
+
+				long subHash = getVertexId(stmt.getSubject());
+				long objHash = getVertexId(stmt.getObject());
+				rtw.setRyaType(stmt.getPredicate());
+
+				Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
+						subHash, objHash, rtw);
+				currentV = writable;
+			} catch (TripleRowResolverException e) {
+				throw new IOException(e);
+			}
+			return true;
+		}
+
+		protected List<IteratorSetting> contextIterators(
+				TaskAttemptContext context, String tableName) {
+			return getIterators(context);
+		}
+
+		@Override
+		protected void setupIterators(TaskAttemptContext context,
+				Scanner scanner, String tableName,
+				org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+			List<IteratorSetting> iterators = null;
+
+			if (null == split) {
+				iterators = contextIterators(context, tableName);
+			} else {
+				iterators = split.getIterators();
+				if (null == iterators) {
+					iterators = contextIterators(context, tableName);
+				}
+			}
+
+			for (IteratorSetting iterator : iterators)
+				scanner.addScanIterator(iterator);
+		}
+
+	}
+
+	public static long getVertexId(RyaType resource) throws IOException {
+		String uri = "";
+		if (resource != null) {
+			uri = resource.getData().toString();
+		}
+		try {
+			// SHA-256 the string value and then generate a hashcode from
+			// the digested string, the collision ratio is less than 0.0001%
+			// using custom hash function should significantly reduce the
+			// collision ratio
+			MessageDigest messageDigest = MessageDigest
+					.getInstance("SHA-256");
+			messageDigest.update(uri.getBytes());
+			String encryptedString = new String(messageDigest.digest());
+			return hash(encryptedString);
+		}
+		catch (NoSuchAlgorithmException e) {
+			throw new IOException(e);
+		}
+	}
+
+	public static long hash(String string) {
+		long h = 1125899906842597L; // prime
+		int len = string.length();
+
+		for (int i = 0; i < len; i++) {
+			h = 31 * h + string.charAt(i);
+		}
+		return h;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
new file mode 100644
index 0000000..77f4e63
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
@@ -0,0 +1,147 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTypeResolverException;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> {
+
+	private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
+
+	/**
+	 * Instantiates a RecordReader for this InputFormat and a given task and
+	 * input split.
+	 *
+	 * @param split
+	 *            Defines the portion of the input this RecordReader is
+	 *            responsible for.
+	 * @param context
+	 *            The context of the task.
+	 * @return A RecordReader that can be used to fetch RyaStatementWritables.
+	 */
+	@Override
+	public RecordReader<Object, RyaTypeWritable> createRecordReader(
+			InputSplit split, TaskAttemptContext context) {
+		return new RyaStatementRecordReader();
+	}
+
+
+
+	/**
+	 * Retrieves RyaStatementWritable objects from Accumulo tables.
+	 */
+	public class RyaStatementRecordReader extends
+			AbstractRecordReader<Object, RyaTypeWritable> {
+		protected void setupIterators(TaskAttemptContext context,
+				Scanner scanner, String tableName,
+				@SuppressWarnings("deprecation") RangeInputSplit split) {
+			IteratorSetting iteratorSetting = new IteratorSetting(
+					WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class);
+			scanner.addScanIterator(iteratorSetting);
+		}
+
+		/**
+		 * Initializes the RecordReader.
+		 *
+		 * @param inSplit
+		 *            Defines the portion of data to read.
+		 * @param attempt
+		 *            Context for this task attempt.
+		 * @throws IOException
+		 *             if thrown by the superclass's initialize method.
+		 */
+		@Override
+		public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
+				throws IOException {
+			super.initialize(inSplit, attempt);
+		}
+
+		/**
+		 * Load the next statement by converting the next Accumulo row to a
+		 * statement, and make the new (key,value) pair available for retrieval.
+		 *
+		 * @return true if another (key,value) pair was fetched and is ready to
+		 *         be retrieved, false if there was none.
+		 * @throws IOException
+		 *             if a row was loaded but could not be converted to a
+		 *             statement.
+		 */
+		@Override
+		public boolean nextKeyValue() throws IOException {
+			if (!scannerIterator.hasNext())
+				return false;
+			Entry<Key, Value> entry = scannerIterator.next();
+			++numKeysRead;
+			currentKey = entry.getKey();
+
+			try {
+				RyaType type = EntityCentricIndex.getRyaType(currentKey, entry.getValue());
+				RyaTypeWritable writable = new RyaTypeWritable();
+				writable.setRyaType(type);
+				currentK = GraphXEdgeInputFormat.getVertexId(type);
+				currentV = writable;
+			} catch (RyaTypeResolverException e) {
+				throw new IOException();
+			}
+			return true;
+		}
+
+		protected List<IteratorSetting> contextIterators(
+				TaskAttemptContext context, String tableName) {
+			return getIterators(context);
+		}
+
+		@Override
+		protected void setupIterators(TaskAttemptContext context,
+				Scanner scanner, String tableName,
+				org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+
+			List<IteratorSetting> iterators = null;
+
+			if (null == split) {
+				iterators = contextIterators(context, tableName);
+			} else {
+				iterators = split.getIterators();
+				if (null == iterators) {
+					iterators = contextIterators(context, tableName);
+				}
+			}
+
+			for (IteratorSetting iterator : iterators)
+				scanner.addScanIterator(iterator);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
new file mode 100644
index 0000000..ec47d82
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
@@ -0,0 +1,123 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.rya.api.domain.RyaType;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
+
+    private RyaType ryatype;
+
+    /**
+     * Read part of a statement from an input stream.
+     * @param dataInput Stream for reading serialized statements.
+     * @return The next individual field, as a byte array.
+     * @throws IOException if reading from the stream fails.
+     */
+    protected byte[] read(DataInput dataInput) throws IOException {
+        if (dataInput.readBoolean()) {
+            int len = dataInput.readInt();
+            byte[] bytes = new byte[len];
+            dataInput.readFully(bytes);
+            return bytes;
+        }else {
+            return null;
+        }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        ValueFactoryImpl vfi = new ValueFactoryImpl();
+        String data = dataInput.readLine();
+        String dataTypeString = dataInput.readLine();
+        URI dataType = vfi.createURI(dataTypeString);
+        ryatype.setData(data);
+        ryatype.setDataType(dataType);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        dataOutput.writeChars(ryatype.getData());
+        dataOutput.writeChars(ryatype.getDataType().toString());
+    }
+
+    /**
+     * Gets the contained RyaStatement.
+     * @return The statement represented by this RyaStatementWritable.
+     */
+    public RyaType getRyaType() {
+        return ryatype;
+    }
+    /**
+     * Sets the contained RyaStatement.
+     * @param   ryaStatement    The statement to be represented by this
+     *                          RyaStatementWritable.
+     */
+    public void setRyaType(RyaType ryatype) {
+        this.ryatype = ryatype;
+    }
+
+    @Override
+    public int compareTo(RyaTypeWritable o) {
+        return ryatype.compareTo(o.ryatype);
+    }
+
+    /**
+     * Tests for equality using the equals method of the enclosed RyaType.
+     * @param   o   Object to compare with
+     * @return  true if both objects are RyaTypeWritables containing equivalent
+     *          RyaTypes.
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (o == null || !(o instanceof RyaTypeWritable)) {
+            return false;
+        }
+        RyaType rtThis = ryatype;
+        RyaType rtOther = ((RyaTypeWritable) o).ryatype;
+        if (rtThis == null) {
+            return rtOther == null;
+        }
+        else {
+            return rtThis.equals(rtOther);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        if (ryatype == null) {
+            return 0;
+        }
+        else {
+            return ryatype.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
deleted file mode 100644
index 445499d..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.spark.graphx.Edge;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXEdgeInputFormatTest {
-
-    static String username = "root", table = "rya_spo";
-    static PasswordToken password = new PasswordToken("");
-
-    static Instance instance;
-    static AccumuloRyaDAO apiImpl;
-
-    @Before
-    public void init() throws Exception {
-        instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance");
-        Connector connector = instance.getConnector(username, password);
-        connector.tableOperations().create(table);
-
-        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("rya_");
-        conf.setDisplayQueryPlan(false);
-
-        apiImpl = new AccumuloRyaDAO();
-        apiImpl.setConf(conf);
-        apiImpl.setConnector(connector);
-        apiImpl.init();
-    }
-
-    @After
-    public void after() throws Exception {
-        apiImpl.dropAndDestroy();
-    }
-
-    @SuppressWarnings("rawtypes")
-	@Test
-    public void testInputFormat() throws Exception {
-    	RyaStatement input = RyaStatement.builder()
-            .setSubject(new RyaURI("http://www.google.com"))
-            .setPredicate(new RyaURI("http://some_other_uri"))
-            .setObject(new RyaURI("http://www.yahoo.com"))
-            .setColumnVisibility(new byte[0])
-            .setValue(new byte[0])
-            .build();
-
-        apiImpl.add(input);
-
-        Job jobConf = Job.getInstance();
-
-        GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName());
-        GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
-        GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
-        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
-        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
-
-        GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
-        GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
-        GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
-
-        GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
-
-        JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
-
-        List<InputSplit> splits = inputFormat.getSplits(context);
-
-        Assert.assertEquals(1, splits.size());
-
-        TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
-
-        RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
-        RecordReader ryaStatementRecordReader = (RecordReader) reader;
-        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
-        List<Edge> results = new ArrayList<Edge>();
-        while(ryaStatementRecordReader.nextKeyValue()) {
-            Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
-            long srcId = writable.srcId();
-            long destId = writable.dstId();
-			RyaTypeWritable rtw = null;
-            Object text = ryaStatementRecordReader.getCurrentKey();
-            Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw);
-            results.add(edge);
-
-            System.out.println(text);
-        }
-
-        System.out.println(results.size());
-        System.out.println(results);
-        Assert.assertTrue(results.size() == 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
deleted file mode 100644
index a31b27f..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXInputFormatTest {
-
-	private String username = "root", table = "rya_eci";
-    private PasswordToken password = new PasswordToken("");
-
-    private Instance instance;
-    private AccumuloRyaDAO apiImpl;
-
-    @Before
-    public void init() throws Exception {
-        instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance");
-        Connector connector = instance.getConnector(username, password);
-        connector.tableOperations().create(table);
-
-        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("rya_");
-        conf.setDisplayQueryPlan(false);
-        conf.setBoolean("sc.use_entity", true);
-
-        apiImpl = new AccumuloRyaDAO();
-        apiImpl.setConf(conf);
-        apiImpl.setConnector(connector);
-        apiImpl.init();
-    }
-
-    @After
-    public void after() throws Exception {
-        apiImpl.dropAndDestroy();
-    }
-
-    @Test
-    public void testInputFormat() throws Exception {
-    	RyaStatement input = RyaStatement.builder()
-            .setSubject(new RyaURI("http://www.google.com"))
-            .setPredicate(new RyaURI("http://some_other_uri"))
-            .setObject(new RyaURI("http://www.yahoo.com"))
-            .setColumnVisibility(new byte[0])
-            .setValue(new byte[0])
-            .build();
-
-        apiImpl.add(input);
-
-        Job jobConf = Job.getInstance();
-
-        GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
-        GraphXInputFormat.setConnectorInfo(jobConf, username, password);
-        GraphXInputFormat.setInputTableName(jobConf, table);
-        GraphXInputFormat.setInputTableName(jobConf, table);
-
-        GraphXInputFormat.setScanIsolation(jobConf, false);
-        GraphXInputFormat.setLocalIterators(jobConf, false);
-        GraphXInputFormat.setOfflineTableScan(jobConf, false);
-
-        GraphXInputFormat inputFormat = new GraphXInputFormat();
-
-        JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
-
-        List<InputSplit> splits = inputFormat.getSplits(context);
-
-        Assert.assertEquals(1, splits.size());
-
-        TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
-
-        RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
-        RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
-        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
-        List<RyaType> results = new ArrayList<RyaType>();
-        System.out.println("before while");
-        while(ryaStatementRecordReader.nextKeyValue()) {
-        	System.out.println("in while");
-            RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue();
-            RyaType value = writable.getRyaType();
-            Object text = ryaStatementRecordReader.getCurrentKey();
-            RyaType type = new RyaType();
-            type.setData(value.getData());
-            type.setDataType(value.getDataType());
-            results.add(type);
-            
-            System.out.println(value.getData());
-            System.out.println(value.getDataType());
-            System.out.println(results);
-            System.out.println(type);
-            System.out.println(text);
-            System.out.println(value);
-        }
-        System.out.println("after while");
-
-        System.out.println(results.size());
-        System.out.println(results);
-//        Assert.assertTrue(results.size() == 2);
-//        Assert.assertTrue(results.contains(input));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
new file mode 100644
index 0000000..6686c8f
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
@@ -0,0 +1,134 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.graphx.Edge;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXEdgeInputFormatTest {
+
+    static String username = "root", table = "rya_spo";
+    static PasswordToken password = new PasswordToken("");
+
+    static Instance instance;
+    static AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testInputFormat() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+        GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+
+        GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
+        GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
+        GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+        RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+        RecordReader ryaStatementRecordReader = (RecordReader) reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<Edge> results = new ArrayList<Edge>();
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
+            long srcId = writable.srcId();
+            long destId = writable.dstId();
+            RyaTypeWritable rtw = null;
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw);
+            results.add(edge);
+
+            System.out.println(text);
+        }
+
+        System.out.println(results.size());
+        System.out.println(results);
+        Assert.assertTrue(results.size() == 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
new file mode 100644
index 0000000..b2a663c
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
@@ -0,0 +1,142 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXInputFormatTest {
+
+    private String username = "root", table = "rya_eci";
+    private PasswordToken password = new PasswordToken("");
+
+    private Instance instance;
+    private AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+        conf.setBoolean("sc.use_entity", true);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @Test
+    public void testInputFormat() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+        GraphXInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+
+        GraphXInputFormat.setScanIsolation(jobConf, false);
+        GraphXInputFormat.setLocalIterators(jobConf, false);
+        GraphXInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXInputFormat inputFormat = new GraphXInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+        RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+        RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<RyaType> results = new ArrayList<RyaType>();
+        System.out.println("before while");
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            System.out.println("in while");
+            RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue();
+            RyaType value = writable.getRyaType();
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            RyaType type = new RyaType();
+            type.setData(value.getData());
+            type.setDataType(value.getDataType());
+            results.add(type);
+
+            System.out.println(value.getData());
+            System.out.println(value.getDataType());
+            System.out.println(results);
+            System.out.println(type);
+            System.out.println(text);
+            System.out.println(value);
+        }
+        System.out.println("after while");
+
+        System.out.println(results.size());
+        System.out.println(results);
+//        Assert.assertTrue(results.size() == 2);
+//        Assert.assertTrue(results.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5ab29aa..43cf6f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@ under the License.
         <module>osgi</module>
         <module>pig</module>
         <module>sail</module>
+        <module>spark</module>
         <module>web</module>
     </modules>
     <properties>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 02bdb37..04c8bb5 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -24,11 +24,11 @@ under the License.
     <parent>
         <groupId>org.apache.rya</groupId>
         <artifactId>rya-project</artifactId>
-        <version>3.2.10-SNAPSHOT</version>
+        <version>3.2.10-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>rya.spark</artifactId>
-    <name>Apache Rya MapReduce Tools</name>
+    <name>Apache Rya Spark Support</name>
 
     <dependencies>
 	    <dependency>
@@ -39,7 +39,7 @@ under the License.
 	    <dependency>
 		    <groupId>org.apache.spark</groupId>
 		    <artifactId>spark-core_2.11</artifactId>
-		    <version>1.2.2</version>
+		    <version>1.6.2</version>
 		</dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
deleted file mode 100644
index f4b7860..0000000
--- a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package mvm.rya.accumulo.spark;
-
-import java.io.IOException;
-
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.GraphXEdgeInputFormat;
-import mvm.rya.accumulo.mr.GraphXInputFormat;
-import mvm.rya.accumulo.mr.MRUtils;
-import mvm.rya.accumulo.mr.RyaInputFormat;
-import mvm.rya.accumulo.mr.RyaTypeWritable;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.graphx.Edge;
-import org.apache.spark.graphx.Graph;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import com.google.common.base.Preconditions;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class GraphXGraphGenerator {
-	
-	public String zk;
-	public String instance;
-	public String userName;
-	public String pwd;
-	public boolean mock;
-	public String tablePrefix;
-	public Authorizations authorizations;
-	
-	public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
-		// Load configuration parameters
-        zk = MRUtils.getACZK(conf);
-        instance = MRUtils.getACInstance(conf);
-        userName = MRUtils.getACUserName(conf);
-        pwd = MRUtils.getACPwd(conf);
-        mock = MRUtils.getACMock(conf, false);
-        tablePrefix = MRUtils.getTablePrefix(conf);
-        // Set authorizations if specified
-        String authString = conf.get(MRUtils.AC_AUTH_PROP);
-		if (authString != null && !authString.isEmpty()) {
-            authorizations = new Authorizations(authString.split(","));
-            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
-        }
-        else {
-            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-        }
-        // Set table prefix to the default if not set
-        if (tablePrefix == null) {
-            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-            MRUtils.setTablePrefix(conf, tablePrefix);
-        }
-        // Check for required configuration parameters
-        Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
-        Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
-        Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
-        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
-        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        // If connecting to real accumulo, set additional parameters and require zookeepers
-        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
-        // Ensure consistency between alternative configuration properties
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
-        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
-        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
-
-		Job job = Job.getInstance(conf, sc.appName());
-		
-		ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-		
-		//maybe ask conf for correct suffix?
-		GraphXInputFormat.setInputTableName(job, EntityCentricIndex.CONF_TABLE_SUFFIX);
-		GraphXInputFormat.setConnectorInfo(job, userName, pwd);
-		GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
-		GraphXInputFormat.setScanAuthorizations(job, authorizations);
-		
-		return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
-	}
-	
-	public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
-		// Load configuration parameters
-        zk = MRUtils.getACZK(conf);
-        instance = MRUtils.getACInstance(conf);
-        userName = MRUtils.getACUserName(conf);
-        pwd = MRUtils.getACPwd(conf);
-        mock = MRUtils.getACMock(conf, false);
-        tablePrefix = MRUtils.getTablePrefix(conf);
-        // Set authorizations if specified
-        String authString = conf.get(MRUtils.AC_AUTH_PROP);
-		if (authString != null && !authString.isEmpty()) {
-            authorizations = new Authorizations(authString.split(","));
-            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
-        }
-        else {
-            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-        }
-        // Set table prefix to the default if not set
-        if (tablePrefix == null) {
-            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-            MRUtils.setTablePrefix(conf, tablePrefix);
-        }
-        // Check for required configuration parameters
-        Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
-        Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
-        Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
-        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
-        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        // If connecting to real accumulo, set additional parameters and require zookeepers
-        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
-        // Ensure consistency between alternative configuration properties
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
-        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
-        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
-
-		Job job = Job.getInstance(conf, sc.appName());
-		
-		ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-		
-		RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
-		RyaInputFormat.setConnectorInfo(job, userName, pwd);
-		RyaInputFormat.setZooKeeperInstance(job, clientConfig);
-		RyaInputFormat.setScanAuthorizations(job, authorizations);
-		return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class);
-	}
-	
-	public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
-		StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
-		StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
-		ClassTag<RyaTypeWritable> RTWTag = null;
-		RyaTypeWritable rtw = null;
-		RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf);
-		
-		RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
-		JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
-		JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2);
-		
-		RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
-		
-		return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag);
-	}
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-	
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
new file mode 100644
index 0000000..b1889b8
--- /dev/null
+++ b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
@@ -0,0 +1,183 @@
+package org.apache.rya.accumulo.spark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.GraphXEdgeInputFormat;
+import org.apache.rya.accumulo.mr.GraphXInputFormat;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.graphx.Graph;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.base.Preconditions;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class GraphXGraphGenerator {
+
+    public String zk;
+    public String instance;
+    public String userName;
+    public String pwd;
+    public boolean mock;
+    public String tablePrefix;
+    public Authorizations authorizations;
+
+    public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+        // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
+
+        Job job = Job.getInstance(conf, sc.appName());
+
+        ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+        GraphXInputFormat.setInputTableName(job, EntityCentricIndex.getTableName(conf));
+        GraphXInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+        GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
+        GraphXInputFormat.setScanAuthorizations(job, authorizations);
+
+        return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
+    }
+
+    public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+        // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
+
+        Job job = Job.getInstance(conf, sc.appName());
+
+        ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+        RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
+        RyaInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+        RyaInputFormat.setZooKeeperInstance(job, clientConfig);
+        RyaInputFormat.setScanAuthorizations(job, authorizations);
+                String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(TABLE_LAYOUT.SPO, tablePrefix);
+                InputFormatBase.setInputTableName(job, tableName);
+        return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class);
+    }
+
+    public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{
+        StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
+        StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
+        ClassTag<RyaTypeWritable> RTWTag = ClassTag$.MODULE$.apply(RyaTypeWritable.class);
+        RyaTypeWritable rtw = null;
+        RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf);
+
+        RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
+        JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
+        JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2);
+
+        RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
+
+        return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag);
+    }
+}