You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 12:50:31 UTC

[6/6] incubator-rya git commit: Consolidated MapReduce API and applications into toplevel project.

Consolidated MapReduce API and applications into toplevel project.

Changes include:
- Made RdfFileInputFormat threaded, allowing it to handle all RDF input
- Added entity-centric indexing to RyaOutputFormat
- Added methods for using Rya input and output formats to AbstractAccumuloMRTool (renamed setupInputFormat to setupAccumuloInput since there are now multiple input options)
- Removed *NullIndexer classes, which were only used in RyaOutputFormat
- Removed StatementWritable, changed to RyaStatementWritable where applicable (for consistency)
- Fixed hashCode/compareTo/equals methods in RyaStatementWritable, RyaType, and RyaURI
- Minor renaming and repackaging (standalone tools go in "tools", "fileinput" and "utils" removed)
- Minor tweaks to AccumuloHDFSFileInputFormat, which seemed to be broken with Accumulo 1.6
- Documented code and added a MapReduce page to the manual
- Added "examples" package with one example
- Removed RyaStatementMapper and RyaStatementReducer: these simply insert records into Rya.
  This functionality can be achieved with RyaOutputFormat and the default mapper/reducer,
  so these two classes seem redundant.
- Removed BulkNtripsInputTool, BulkNtripsInputToolIndexing, RdfFileInputByLineTool,
  and RyaBatchWriterInputTool: Fixes to RdfFileInputFormat now allow RdfFileInputTool
  to now handle all the file input use cases (configurable secondary indexers, handles
  any format, scales), rendering the other file import tools redundant. (Previously, all
  five tools had largely overlapping but subtly different behavior.)


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/42895eac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/42895eac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/42895eac

Branch: refs/heads/develop
Commit: 42895eac0be3eabb836bd1cf0130cdd5951835c2
Parents: c698b56
Author: Jesse Hatfield <je...@parsons.com>
Authored: Thu Jun 30 15:50:08 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Thu Jul 21 08:45:17 2016 -0400

----------------------------------------------------------------------
 .../main/java/mvm/rya/api/domain/RyaType.java   |  38 +-
 .../java/mvm/rya/api/domain/RyaTypeTest.java    | 119 ++++
 dao/accumulo.rya/pom.xml                        |  69 ---
 .../rya/accumulo/mr/AbstractAccumuloMRTool.java | 164 -----
 .../accumulo/mr/RyaStatementInputFormat.java    |  94 ---
 .../mvm/rya/accumulo/mr/RyaStatementMapper.java |  85 ---
 .../rya/accumulo/mr/RyaStatementReducer.java    |  87 ---
 .../rya/accumulo/mr/RyaStatementWritable.java   | 150 -----
 .../accumulo/mr/eval/AccumuloRdfCountTool.java  | 258 --------
 .../mr/fileinput/BulkNtripsInputTool.java       | 369 ------------
 .../mr/fileinput/RdfFileInputByLineTool.java    | 251 --------
 .../mr/fileinput/RdfFileInputFormat.java        | 146 -----
 .../accumulo/mr/fileinput/RdfFileInputTool.java | 175 ------
 .../rya/accumulo/mr/upgrade/Upgrade322Tool.java | 240 --------
 .../mr/utils/AccumuloHDFSFileInputFormat.java   | 206 -------
 .../rya/accumulo/mr/utils/AccumuloProps.java    |  58 --
 .../java/mvm/rya/accumulo/mr/utils/MRUtils.java | 119 ----
 .../mvm/rya/accumulo/mr/RyaInputFormatTest.java | 225 -------
 .../mr/eval/AccumuloRdfCountToolTest.java       | 282 ---------
 .../mr/fileinput/RdfFileInputToolTest.java      | 146 -----
 .../accumulo/mr/upgrade/Upgrade322ToolTest.java | 319 ----------
 .../upgrade/UpgradeObjectSerializationTest.java | 119 ----
 .../src/test/resources/namedgraphs.trig         |   7 -
 .../src/test/resources/test.ntriples            |   1 -
 .../rya/accumulo/mr/NullFreeTextIndexer.java    | 102 ----
 .../mvm/rya/accumulo/mr/NullGeoIndexer.java     | 153 -----
 .../rya/accumulo/mr/NullTemporalIndexer.java    | 186 ------
 .../mvm/rya/accumulo/mr/RyaOutputFormat.java    | 329 ----------
 .../mvm/rya/accumulo/mr/StatementWritable.java  |  86 ---
 .../fileinput/BulkNtripsInputToolIndexing.java  | 227 -------
 .../mr/fileinput/RyaBatchWriterInputTool.java   | 243 --------
 extras/rya.manual/src/site/markdown/_index.md   |   1 +
 extras/rya.manual/src/site/markdown/index.md    |   1 +
 extras/rya.manual/src/site/markdown/loaddata.md |  15 +-
 .../rya.manual/src/site/markdown/mapreduce.md   | 107 ++++
 .../src/site/markdown/sm-firststeps.md          |   6 +-
 extras/rya.manual/src/site/site.xml             |   1 +
 extras/rya.reasoning/pom.xml                    |   4 +
 .../rya/reasoning/mr/AbstractReasoningTool.java |   4 +-
 .../mvm/rya/reasoning/mr/ConformanceTest.java   |   2 +-
 .../mvm/rya/reasoning/mr/MRReasoningUtils.java  |   2 +-
 extras/tinkerpop.rya/pom.xml                    |   4 +
 .../config/RyaGraphConfiguration.groovy         |   2 +-
 .../mvm/rya/blueprints/TstGremlinRya.groovy     |   2 +-
 .../config/RyaGraphConfigurationTest.groovy     |   2 +-
 .../sail/RyaSailVertexSequenceTest.groovy       |   2 +-
 mapreduce/pom.xml                               | 125 ++++
 .../rya/accumulo/mr/AbstractAccumuloMRTool.java | 305 ++++++++++
 .../mr/AccumuloHDFSFileInputFormat.java         | 161 +++++
 .../main/java/mvm/rya/accumulo/mr/MRUtils.java  | 317 ++++++++++
 .../mvm/rya/accumulo/mr/RdfFileInputFormat.java | 443 ++++++++++++++
 .../mvm/rya/accumulo/mr/RyaInputFormat.java     | 130 ++++
 .../mvm/rya/accumulo/mr/RyaOutputFormat.java    | 597 +++++++++++++++++++
 .../rya/accumulo/mr/RyaStatementWritable.java   | 256 ++++++++
 .../accumulo/mr/examples/TextOutputExample.java | 196 ++++++
 .../accumulo/mr/tools/AccumuloRdfCountTool.java | 258 ++++++++
 .../rya/accumulo/mr/tools/RdfFileInputTool.java |  91 +++
 .../rya/accumulo/mr/tools/Upgrade322Tool.java   | 241 ++++++++
 .../rya/accumulo/mr/RdfFileInputFormatTest.java | 180 ++++++
 .../mvm/rya/accumulo/mr/RyaInputFormatTest.java | 156 +++++
 .../rya/accumulo/mr/RyaOutputFormatTest.java    | 324 ++++++++++
 .../accumulo/mr/RyaStatementWritableTest.java   | 146 +++++
 .../java/mvm/rya/accumulo/mr/TestUtils.java     | 113 ++++
 .../mr/tools/AccumuloRdfCountToolTest.java      | 283 +++++++++
 .../accumulo/mr/tools/RdfFileInputToolTest.java | 131 ++++
 .../accumulo/mr/tools/Upgrade322ToolTest.java   | 275 +++++++++
 .../tools/UpgradeObjectSerializationTest.java   | 119 ++++
 mapreduce/src/test/resources/namedgraphs.trig   |   7 +
 mapreduce/src/test/resources/test.ntriples      |   3 +
 pom.xml                                         |   6 +
 70 files changed, 5152 insertions(+), 4919 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java b/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
index ab580d6..7a35253 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
@@ -80,19 +80,25 @@ public class RyaType implements Comparable {
         return sb.toString();
     }
 
+    /**
+     * Determine equality based on string representations of data and datatype.
+     * @param o The object to compare with
+     * @return true if the other object is also a RyaType and both data and datatype match.
+     */
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
+        if (o == null || !(o instanceof RyaType)) return false;
         RyaType ryaType = (RyaType) o;
-
         if (data != null ? !data.equals(ryaType.data) : ryaType.data != null) return false;
         if (dataType != null ? !dataType.equals(ryaType.dataType) : ryaType.dataType != null) return false;
-
         return true;
     }
 
+    /**
+     * Generate a hash based on the string representations of both data and datatype.
+     * @return A hash consistent with equals.
+     */
     @Override
     public int hashCode() {
         int result = dataType != null ? dataType.hashCode() : 0;
@@ -100,12 +106,30 @@ public class RyaType implements Comparable {
         return result;
     }
 
+    /**
+     * Define a natural ordering based on data and datatype.
+     * @param o The object to compare with
+     * @return 0 if both the data string and the datatype string representation match between the objects,
+     *          where matching is defined by string comparison or both being null;
+     *          Otherwise, an integer whose sign yields a consistent ordering.
+     */
     @Override
     public int compareTo(Object o) {
-        if (o != null && this.getClass().isInstance(o)) {
+        int result = -1;
+        if (o != null && o instanceof RyaType) {
+            result = 0;
             RyaType other = (RyaType) o;
-            return this.getData().compareTo(other.getData());
+            if (this.data != other.data) {
+                if (this.data == null) return 1;
+                if (other.data == null) return -1;
+                result = this.data.compareTo(other.data);
+            }
+            if (result == 0 && this.dataType != other.dataType) {
+                if (this.dataType == null) return 1;
+                if (other.dataType == null) return -1;
+                result = this.dataType.toString().compareTo(other.dataType.toString());
+            }
         }
-        return -1;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java b/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
new file mode 100644
index 0000000..6db6053
--- /dev/null
+++ b/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
@@ -0,0 +1,119 @@
+package mvm.rya.api.domain;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+public class RyaTypeTest {
+    static RyaType a = new RyaType(XMLSchema.STRING, "http://www.example.com/Alice");
+    static RyaType b = new RyaType(XMLSchema.STRING, "http://www.example.com/Bob");
+    static RyaType c = new RyaType(XMLSchema.STRING, "http://www.example.com/Carol");
+    static RyaType aUri = new RyaType(XMLSchema.ANYURI, "http://www.example.com/Alice");
+    static RyaType bUri = new RyaType(XMLSchema.ANYURI, "http://www.example.com/Bob");
+    RyaType nullData = new RyaType(XMLSchema.STRING, null);
+    RyaType nullType = new RyaType(null, "http://www.example.com/Alice");
+    RyaType nullBoth = new RyaType(null, null);
+    RyaType same = new RyaType(XMLSchema.STRING, "http://www.example.com/Alice");
+
+    @Test
+    public void testCompareTo() throws Exception {
+        Assert.assertEquals("compareTo(self) should return zero.", 0, aUri.compareTo(aUri));
+        Assert.assertFalse("compareTo should return nonzero for different data and type.", aUri.compareTo(b) == 0);
+        Assert.assertFalse("compareTo should return nonzero for same data and different datatypes.", a.compareTo(aUri) == 0);
+        Assert.assertFalse("compareTo should return nonzero for same datatype and different data.", bUri.compareTo(aUri) == 0);
+        Assert.assertEquals("compareTo should return zero for different objects with matching data and datatype.",
+                0, a.compareTo(same));
+    }
+
+    @Test
+    public void testCompareToNullFields() throws Exception {
+        Assert.assertEquals("[has no nulls].compareTo([has null data]) should return -1", -1, a.compareTo(nullData));
+        Assert.assertEquals("[has no nulls].compareTo([has null type]) should return -1 if data is equal",
+                -1, a.compareTo(nullType));
+        Assert.assertEquals("[has null data].compareTo([has no nulls]) should return 1", 1, nullData.compareTo(a));
+        Assert.assertEquals("[has null type].compareTo([has no nulls]) should return 1 if data is equal",
+                 1, nullType.compareTo(a));
+        Assert.assertEquals("[has null type].compareTo([has null data]) should return -1", -1, nullType.compareTo(nullData));
+    }
+
+    @Test
+    public void testCompareToSymmetry() throws Exception {
+        int forward = Integer.signum(a.compareTo(b));
+        int backward = Integer.signum(b.compareTo(a));
+        Assert.assertEquals("Comparison of different values with same type should yield opposite signs.", forward, backward * -1);
+        forward = Integer.signum(bUri.compareTo(b));
+        backward = Integer.signum(b.compareTo(bUri));
+        Assert.assertEquals("Comparison of same values with different types should yield opposite signs.", forward, backward*-1);
+        forward = Integer.signum(aUri.compareTo(b));
+        backward = Integer.signum(b.compareTo(aUri));
+        Assert.assertEquals("Comparison of different values with different types should yield opposite signs.",
+                forward, backward * -1);
+    }
+
+    @Test
+    public void testCompareToTransitive() throws Exception {
+        int sign = Integer.signum(a.compareTo(b));
+        Assert.assertEquals("compareTo(a,b) and compareTo(b,c) should have the same sign.",
+                sign, Integer.signum(b.compareTo(c)));
+        Assert.assertEquals("if a > b > c, compareTo(a,c) should be consistent.", sign, Integer.signum(a.compareTo(c)));
+    }
+
+    @Test
+    public void testEquals() throws Exception {
+        Assert.assertTrue("Same data and datatype should be equal.", a.equals(same));
+        Assert.assertFalse("Same data, different datatype should be unequal.", a.equals(aUri));
+        Assert.assertFalse("Same datatype, different data should be unequal.", a.equals(b));
+    }
+
+    @Test
+    public void testEqualsNullFields() throws Exception {
+        Assert.assertFalse("equals(null) should return false.", a.equals(null));
+        Assert.assertFalse("Same data, one null datatype should be unequal.", a.equals(nullType));
+        Assert.assertFalse("Same datatype, one null data should be unequal.", a.equals(nullData));
+        RyaType sameNull = new RyaType(null, null);
+        Assert.assertTrue("Matching null fields should be equal.", sameNull.equals(nullBoth));
+    }
+
+    @Test
+    public void testEqualsCompareToConsistency() throws Exception {
+        Assert.assertEquals("equals and compareTo inconsistent for matching values and types.",
+                a.equals(same), a.compareTo(same) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for different values with same types.",
+                a.equals(b), a.compareTo(b) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for same values having different types.",
+                a.equals(aUri), a.compareTo(aUri) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for different values and different types.",
+                a.equals(bUri), a.compareTo(bUri) == 0);
+    }
+
+    @Test
+    public void testHashCodeEquals() throws Exception {
+        Assert.assertEquals("Same data and same type should yield same hash code.",
+                a.hashCode(), same.hashCode());
+        Assert.assertEquals("Same type and both null data should yield same hash code.",
+                nullData.hashCode(), new RyaType(XMLSchema.STRING, null).hashCode());
+        Assert.assertEquals("Same data and both null type should yield same hash code.",
+                nullType.hashCode(), new RyaType(null, "http://www.example.com/Alice").hashCode());
+        Assert.assertEquals("Null type and null data should yield same hash code.",
+                nullBoth.hashCode(), new RyaType(null, null).hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/pom.xml b/dao/accumulo.rya/pom.xml
index c089cfe..295b755 100644
--- a/dao/accumulo.rya/pom.xml
+++ b/dao/accumulo.rya/pom.xml
@@ -36,84 +36,15 @@ under the License.
             <artifactId>rya.api</artifactId>
         </dependency>
         
-        <!-- Accumulo deps -->
         <dependency>
             <groupId>org.apache.accumulo</groupId>
             <artifactId>accumulo-core</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-ntriples</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-nquads</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-queryalgebra-evaluation</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-trig</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <version>1.1.0</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
-
-    <build>
-        <pluginManagement>
-            <plugins>
-                <plugin>
-                    <groupId>org.apache.rat</groupId>
-                    <artifactId>apache-rat-plugin</artifactId>
-                    <configuration>
-                        <excludes>
-                            <!-- RDF data Files -->
-                            <exclude>**/*.ntriples</exclude>
-                            <exclude>**/*.trig</exclude>
-                        </excludes>
-                    </configuration>
-                </plugin>
-            </plugins>
-        </pluginManagement>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>mr</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-shade-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <configuration>
-                                    <transformers>
-                                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
-                                    </transformers>
-                                </configuration>
-                            </execution>
-                        </executions>
-
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
deleted file mode 100644
index 000c08a..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.iterators.user.AgeOffFilter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- */
-public abstract class AbstractAccumuloMRTool {
-
-    protected Configuration conf;
-    protected RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
-    protected String userName = "root";
-    protected String pwd = "root";
-    protected String instance = "instance";
-    protected String zk = "zoo";
-    protected Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-    protected String ttl = null;
-    protected boolean mock = false;
-    protected boolean hdfsInput = false;
-    protected String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-
-    protected void init() {
-        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-        ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
-        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-        mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, mock);
-        hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, hdfsInput);
-        tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-        if (tablePrefix != null)
-            RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
-                conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
-        String auth = conf.get(MRUtils.AC_AUTH_PROP);
-        if (auth != null)
-            authorizations = new Authorizations(auth.split(","));
-
-        if (!mock) {
-            conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-            conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-            conf.set("io.sort.mb", "256");
-        }
-
-        //set ttl
-        ttl = conf.get(MRUtils.AC_TTL_PROP);
-    }
-
-    protected void setupInputFormat(Job job) throws AccumuloSecurityException {
-        // set up accumulo input
-        if (!hdfsInput) {
-            job.setInputFormatClass(AccumuloInputFormat.class);
-        } else {
-            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
-        }
-        AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
-        AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix));
-        AccumuloInputFormat.setScanAuthorizations(job, authorizations);
-        if (!mock) {
-            AccumuloInputFormat.setZooKeeperInstance(job, instance, zk);
-        } else {
-            AccumuloInputFormat.setMockInstance(job, instance);
-        }
-        if (ttl != null) {
-            IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName());
-            AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
-            AccumuloInputFormat.addIterator(job, setting);
-        }
-    }
-
-    protected void setupOutputFormat(Job job, String outputTable) throws AccumuloSecurityException {
-        AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
-        AccumuloOutputFormat.setCreateTables(job, true);
-        AccumuloOutputFormat.setDefaultTableName(job, outputTable);
-        if (!mock) {
-            AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
-        } else {
-            AccumuloOutputFormat.setMockInstance(job, instance);
-        }
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-    }
-
-    public void setConf(Configuration configuration) {
-        this.conf = configuration;
-    }
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public String getInstance() {
-        return instance;
-    }
-
-    public void setInstance(String instance) {
-        this.instance = instance;
-    }
-
-    public String getPwd() {
-        return pwd;
-    }
-
-    public void setPwd(String pwd) {
-        this.pwd = pwd;
-    }
-
-    public String getZk() {
-        return zk;
-    }
-
-    public void setZk(String zk) {
-        this.zk = zk;
-    }
-
-    public String getTtl() {
-        return ttl;
-    }
-
-    public void setTtl(String ttl) {
-        this.ttl = ttl;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
deleted file mode 100644
index 3399166..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-public class RyaStatementInputFormat extends AbstractInputFormat<Text, RyaStatementWritable> {
-    @Override
-    public RecordReader<Text, RyaStatementWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-        return new RyaStatementRecordReader();
-    }
-
-
-    public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
-        conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
-    }
-
-    public class RyaStatementRecordReader extends AbstractRecordReader<Text, RyaStatementWritable> {
-
-        private RyaTripleContext ryaContext;
-        private TABLE_LAYOUT tableLayout;
-
-        @Override
-        protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) {
-
-        }
-
-        @Override
-        public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
-            super.initialize(inSplit, attempt);
-            this.tableLayout = TABLE_LAYOUT.valueOf(attempt.getConfiguration().get(MRUtils.TABLE_LAYOUT_PROP, TABLE_LAYOUT.OSP.toString()));
-            //TODO verify that this is correct
-            this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(attempt.getConfiguration()));
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException {
-            if (!scannerIterator.hasNext())
-                return false;
-
-            Entry<Key, Value> entry = scannerIterator.next();
-            ++numKeysRead;
-            currentKey = entry.getKey();
-
-            try {
-                currentK = currentKey.getRow();
-                RyaStatement stmt = this.ryaContext.deserializeTriple(this.tableLayout, new TripleRow(entry.getKey().getRow().getBytes(), entry.getKey().getColumnFamily().getBytes(), entry.getKey().getColumnQualifier().getBytes(), entry.getKey().getTimestamp(), entry.getKey().getColumnVisibility().getBytes(), entry.getValue().get()));
-                RyaStatementWritable writable = new RyaStatementWritable();
-                writable.setRyaStatement(stmt);
-                currentV = writable;
-            } catch(TripleRowResolverException e) {
-                throw new IOException(e);
-            }
-            return true;
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
deleted file mode 100644
index d90215b..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
+++ /dev/null
@@ -1,85 +0,0 @@
-  
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-public class RyaStatementMapper extends Mapper<Text, RyaStatementWritable, Text, Mutation> {
-
-    private Text spoTable;
-    private Text poTable;
-    private Text ospTable;
-    private RyaTableMutationsFactory mutationsFactory;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.setup(context);
-
-        String tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
-        spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
-        poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
-        ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
-
-        RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(context.getConfiguration()));
-        mutationsFactory = new RyaTableMutationsFactory(ryaContext);
-    }
-
-    @Override
-    protected void map(Text key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
-
-        Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(value.getRyaStatement());
-
-        for(TABLE_LAYOUT layout : mutations.keySet()) {
-
-            Text table = null;
-            switch (layout) {
-                case SPO:
-                    table = spoTable;
-                    break;
-                case OSP:
-                    table = ospTable;
-                    break;
-                case PO:
-                    table = poTable;
-                    break;
-            }
-
-            for(Mutation mutation : mutations.get(layout)) {
-                context.write(table, mutation);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
deleted file mode 100644
index e353528..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.resolver.RyaTripleContext;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-
-public class RyaStatementReducer extends Reducer<WritableComparable, RyaStatementWritable, Text, Mutation> {
-
-    private Text spoTable;
-    private Text poTable;
-    private Text ospTable;
-    private RyaTableMutationsFactory mutationsFactory;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.setup(context);
-
-        String tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
-        spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
-        poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
-        ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
-
-        RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(context.getConfiguration()));
-        mutationsFactory = new RyaTableMutationsFactory(ryaContext);
-    }
-
-    @Override
-    protected void reduce(WritableComparable key, Iterable<RyaStatementWritable> values, Context context) throws IOException, InterruptedException {
-
-        for(RyaStatementWritable value : values) {
-            Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(value.getRyaStatement());
-
-            for(TABLE_LAYOUT layout : mutations.keySet()) {
-
-                Text table = null;
-                switch (layout) {
-                    case SPO:
-                        table = spoTable;
-                        break;
-                    case OSP:
-                        table = ospTable;
-                        break;
-                    case PO:
-                        table = poTable;
-                        break;
-                }
-
-                for(Mutation mutation : mutations.get(layout)) {
-                    context.write(table, mutation);
-                }
-            }
-
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
deleted file mode 100644
index 87a9433..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-/**
- * Date: 7/17/12
- * Time: 1:29 PM
- */
-public class RyaStatementWritable implements WritableComparable {
-
-    private RyaTripleContext ryaContext;
-    private RyaStatement ryaStatement;
-    
-    public RyaStatementWritable(Configuration conf) {
-        this();
-    }
-     
-    public RyaStatementWritable(RyaTripleContext ryaContext) {
-     	this.ryaContext = ryaContext;
-    }
-    
-    public RyaStatementWritable() {
-        this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
-    }
-    
-    public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) {
-    	this(ryaContext);
-        this.ryaStatement = ryaStatement;
-    }
-
-    public RyaStatement getRyaStatement() {
-        return ryaStatement;
-    }
-
-    public void setRyaStatement(RyaStatement ryaStatement) {
-        this.ryaStatement = ryaStatement;
-    }
-
-    @Override
-    public int compareTo(Object o) {
-        if (o instanceof RyaStatementWritable) {
-            return (getRyaStatement().equals(((RyaStatementWritable) o).getRyaStatement())) ? (0) : (-1);
-        }
-        return -1;
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-        if (ryaStatement == null) {
-            throw new IOException("Rya Statement is null");
-        }
-        try {
-            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(ryaStatement);
-            TripleRow tripleRow = map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-            byte[] row = tripleRow.getRow();
-            byte[] columnFamily = tripleRow.getColumnFamily();
-            byte[] columnQualifier = tripleRow.getColumnQualifier();
-            write(dataOutput, row);
-            write(dataOutput, columnFamily);
-            write(dataOutput, columnQualifier);
-            write(dataOutput, ryaStatement.getColumnVisibility());
-            write(dataOutput, ryaStatement.getValue());
-            Long timestamp = ryaStatement.getTimestamp();
-            boolean b = timestamp != null;
-            dataOutput.writeBoolean(b);
-            if (b) {
-                dataOutput.writeLong(timestamp);
-            }
-        } catch (TripleRowResolverException e) {
-            throw new IOException(e);
-        }
-    }
-
-    protected void write(DataOutput dataOutput, byte[] row) throws IOException {
-        boolean b = row != null;
-        dataOutput.writeBoolean(b);
-        if (b) {
-            dataOutput.writeInt(row.length);
-            dataOutput.write(row);
-        }
-    }
-
-    protected byte[] read(DataInput dataInput) throws IOException {
-        if (dataInput.readBoolean()) {
-            int len = dataInput.readInt();
-            byte[] bytes = new byte[len];
-            dataInput.readFully(bytes);
-            return bytes;
-        }else {
-            return null;
-        }
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-        byte[] row = read(dataInput);
-        byte[] columnFamily = read(dataInput);
-        byte[] columnQualifier = read(dataInput);
-        byte[] columnVisibility = read(dataInput);
-        byte[] value = read(dataInput);
-        boolean b = dataInput.readBoolean();
-        Long timestamp = null;
-        if (b) {
-            timestamp = dataInput.readLong();
-        }
-        try {
-            ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
-                    new TripleRow(row, columnFamily, columnQualifier));
-            ryaStatement.setColumnVisibility(columnVisibility);
-            ryaStatement.setValue(value);
-            ryaStatement.setTimestamp(timestamp);
-        } catch (TripleRowResolverException e) {
-            throw new IOException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
deleted file mode 100644
index ee1004d..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
+++ /dev/null
@@ -1,258 +0,0 @@
-package mvm.rya.accumulo.mr.eval;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.Date;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-/**
- * Count subject, predicate, object. Save in table
- * Class RdfCloudTripleStoreCountTool
- * Date: Apr 12, 2011
- * Time: 10:39:40 AM
- * @deprecated
- */
-public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool {
-
-    public static void main(String[] args) {
-        try {
-
-            ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * cloudbase props
-     */
-
-    @Override
-    public int run(String[] strings) throws Exception {
-        conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
-
-        //initialize
-        init();
-
-        Job job = new Job(conf);
-        job.setJarByClass(AccumuloRdfCountTool.class);
-        setupInputFormat(job);
-
-        AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(LongWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Mutation.class);
-
-        // set mapper and reducer classes
-        job.setMapperClass(CountPiecesMapper.class);
-        job.setCombinerClass(CountPiecesCombiner.class);
-        job.setReducerClass(CountPiecesReducer.class);
-
-        String outputTable = tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
-        setupOutputFormat(job, outputTable);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return 0;
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> {
-
-        public static final byte[] EMPTY_BYTES = new byte[0];
-        private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
-
-        ValueFactoryImpl vf = new ValueFactoryImpl();
-
-        private Text keyOut = new Text();
-        private LongWritable valOut = new LongWritable(1);
-        private RyaTripleContext ryaContext;
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
-                    conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
-            ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
-        }
-
-        @Override
-        protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
-            try {
-                RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
-                //count each piece subject, pred, object
-
-                String subj = statement.getSubject().getData();
-                String pred = statement.getPredicate().getData();
-//                byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject());
-                RyaURI scontext = statement.getContext();
-                boolean includesContext = scontext != null;
-                String scontext_str = (includesContext) ? scontext.getData() : null;
-
-                ByteArrayDataOutput output = ByteStreams.newDataOutput();
-                output.writeUTF(subj);
-                output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
-                output.writeBoolean(includesContext);
-                if (includesContext)
-                    output.writeUTF(scontext_str);
-                keyOut.set(output.toByteArray());
-                context.write(keyOut, valOut);
-
-                output = ByteStreams.newDataOutput();
-                output.writeUTF(pred);
-                output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
-                output.writeBoolean(includesContext);
-                if (includesContext)
-                    output.writeUTF(scontext_str);
-                keyOut.set(output.toByteArray());
-                context.write(keyOut, valOut);
-            } catch (TripleRowResolverException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
-
-        private LongWritable valOut = new LongWritable();
-
-        // TODO: can still add up to be large I guess
-        // any count lower than this does not need to be saved
-        public static final int TOO_LOW = 2;
-
-        @Override
-        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-            long count = 0;
-            for (LongWritable lw : values) {
-                count += lw.get();
-            }
-
-            if (count <= TOO_LOW)
-                return;
-
-            valOut.set(count);
-            context.write(key, valOut);
-        }
-
-    }
-
-    public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> {
-
-        Text row = new Text();
-        Text cat_txt = new Text();
-        Value v_out = new Value();
-        ValueFactory vf = new ValueFactoryImpl();
-
-        // any count lower than this does not need to be saved
-        public static final int TOO_LOW = 10;
-        private String tablePrefix;
-        protected Text table;
-        private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-            table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
-            final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = new ColumnVisibility(cv_s);
-        }
-
-        @Override
-        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-            long count = 0;
-            for (LongWritable lw : values) {
-                count += lw.get();
-            }
-
-            if (count <= TOO_LOW)
-                return;
-
-            ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
-            String v = badi.readUTF();
-            cat_txt.set(badi.readUTF());
-
-            Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
-            boolean includesContext = badi.readBoolean();
-            if (includesContext) {
-                columnQualifier = new Text(badi.readUTF());
-            }
-
-            row.set(v);
-            Mutation m = new Mutation(row);
-            v_out.set((count + "").getBytes());
-            m.put(cat_txt, columnQualifier, cv, v_out);
-            context.write(table, m);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
deleted file mode 100644
index c3ddcfd..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,369 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static mvm.rya.accumulo.AccumuloRdfUtils.extractValue;
-import static mvm.rya.accumulo.AccumuloRdfUtils.from;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolver;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
-import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.ParserConfig;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParseException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.nquads.NQuadsParser;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
-    public static final String WORKDIR_PROP = "bulk.n3.workdir";
-
-    private String userName = "root";
-    private String pwd = "root";
-    private String instance = "isntance";
-    private String zk = "zoo";
-    private String ttl = null;
-    private String workDirBase = "/temp/bulkcb/work";
-    private String format = RDFFormat.NQUADS.getName();
-
-    @Override
-    public int run(final String[] args) throws Exception {
-        final Configuration conf = getConf();
-        try {
-            //conf
-            zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-            ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
-            instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-            userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-            pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-            format = conf.get(MRUtils.FORMAT_PROP, format);
-            conf.set(MRUtils.FORMAT_PROP, format);
-            final String inputDir = args[0];
-
-            ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk);
-            Connector connector = zooKeeperInstance.getConnector(userName, new PasswordToken(pwd));
-            TableOperations tableOperations = connector.tableOperations();
-            
-            if (conf.get(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS) != null ) {
-                throw new IllegalArgumentException("Cannot use Bulk N Trips tool with Additional Indexers");
-            }
-
-            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (tablePrefix != null)
-                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-            String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-            Collection<Job> jobs = new ArrayList<Job>();
-            for (final String tableName : tables) {
-                PrintStream out = null;
-                try {
-                    String workDir = workDirBase + "/" + tableName;
-                    System.out.println("Loading data into table[" + tableName + "]");
-
-                    Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]");
-                    job.setJarByClass(this.getClass());
-                    //setting long job
-                    Configuration jobConf = job.getConfiguration();
-                    jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
-                    jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-                    jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256"));
-                    jobConf.setBoolean("mapred.compress.map.output", true);
-//                    jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-
-                    job.setInputFormatClass(TextInputFormat.class);
-
-                    job.setMapperClass(ParseNtripsMapper.class);
-                    job.setMapOutputKeyClass(Key.class);
-                    job.setMapOutputValueClass(Value.class);
-
-                    job.setCombinerClass(OutStmtMutationsReducer.class);
-                    job.setReducerClass(OutStmtMutationsReducer.class);
-                    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-                   // AccumuloFileOutputFormat.setZooKeeperInstance(jobConf, instance, zk);
-
-                    jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
-
-                    TextInputFormat.setInputPaths(job, new Path(inputDir));
-
-                    FileSystem fs = FileSystem.get(conf);
-                    Path workPath = new Path(workDir);
-                    if (fs.exists(workPath))
-                        fs.delete(workPath, true);
-
-                    //make failures dir
-                    Path failures = new Path(workDir, "failures");
-                    fs.delete(failures, true);
-                    fs.mkdirs(new Path(workDir, "failures"));
-
-                    AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-
-                    out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
-                    if (!tableOperations.exists(tableName))
-                        tableOperations.create(tableName);
-                    Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-                    for (Text split : splits)
-                        out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
-                    job.setNumReduceTasks(splits.size() + 1);
-                    out.close();
-
-                    job.setPartitionerClass(KeyRangePartitioner.class);
-                    RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-
-                    jobConf.set(WORKDIR_PROP, workDir);
-
-                    job.submit();
-                    jobs.add(job);
-
-                } catch (Exception re) {
-                    throw new RuntimeException(re);
-                } finally {
-                    if (out != null)
-                        out.close();
-                }
-            }
-
-            for (Job job : jobs) {
-                while (!job.isComplete()) {
-                    Thread.sleep(1000);
-                }
-            }
-
-            for(String tableName : tables) {
-                String workDir = workDirBase + "/" + tableName;
-                String filesDir = workDir + "/files";
-                String failuresDir = workDir + "/failures";
-                
-                FileSystem fs = FileSystem.get(conf);
-                
-                //make sure that the "accumulo" user can read/write/execute into these directories this path
-                fs.setPermission(new Path(filesDir), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                fs.setPermission(new Path(failuresDir), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                
-                tableOperations.importDirectory(
-                        tableName,
-                        filesDir,
-                        failuresDir,
-                        false);
-                
-            }
-
-        } catch (Exception e ){
-            throw new RuntimeException(e);
-        }
-
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-    	ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
-    }
-
-    /**
-     * input: ntrips format triple
-     * <p/>
-     * output: key: shard row from generator
-     * value: stmt in serialized format (document format)
-     */
-    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
-        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
-
-        private RDFParser parser;
-        private String rdfFormat;
-        private String namedGraph;
-        private RyaTripleContext ryaContext;
-        private TripleRowResolver rowResolver;
-
-        @Override
-        protected void setup(final Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            final String table = conf.get(TABLE_PROPERTY);
-            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job");
-            this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
-            rowResolver = ryaContext.getTripleResolver();
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
-            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-            checkNotNull(rdfFormat, "Rdf format cannot be null");
-
-            namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
-
-            parser = new NQuadsParser();
-    		parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY));
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws RDFHandlerException {
-                    try {
-                        RyaStatement rs = RdfToRyaConversions.convertStatement(statement);
-                        if(rs.getColumnVisibility() == null) {
-                            rs.setColumnVisibility(cv);
-                        }
-
-                    	// Inject the specified context into the statement.
-                        if(namedGraph != null){
-                            rs.setContext(new RyaURI(namedGraph));
-                        } else if (statement.getContext() != null) {
-                            rs.setContext(new RyaURI(statement.getContext().toString()));
-                        } 
-
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs);
-
-                        if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(TABLE_LAYOUT.PO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(TABLE_LAYOUT.OSP);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else
-                            throw new IllegalArgumentException("Unrecognized table[" + table + "]");
-
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException {
-
-                }
-            });
-        }
-
-        @Override
-        public void map(LongWritable key, Text value, Context output)
-                throws IOException, InterruptedException {
-            String rdf = value.toString();
-            try {
-                parser.parse(new StringReader(rdf), "");
-            } catch (RDFParseException e) {
-                System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException("Exception occurred parsing triple[" + rdf + "]");
-            }
-        }
-    }
-
-    public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-
-        public void reduce(Key key, Iterable<Value> values, Context output)
-                throws IOException, InterruptedException {
-            output.write(key, AccumuloRdfConstants.EMPTY_VALUE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
deleted file mode 100644
index 5a872a0..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
+++ /dev/null
@@ -1,251 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool2
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputByLineTool implements Tool {
-
-    private Configuration conf = new Configuration();
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "instance";
-    private String zk = "zoo";
-    private String tablePrefix = null;
-    private RDFFormat format = RDFFormat.NTRIPLES;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException, AccumuloSecurityException {
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-        conf.setLong("mapred.task.timeout", 600000000);
-
-        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-        format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString()));
-
-        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputByLineTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(TextInputFormat.class);
-        FileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-        AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd.getBytes()));
-        AccumuloOutputFormat.setCreateTables(job, true);
-        AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-        AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        // set mapper and reducer classes
-        job.setMapperClass(TextToMutationMapper.class);
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        return (int) runJob(args);
-    }
-
-    public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-        protected RDFParser parser;
-        private String prefix;
-        private RDFFormat rdfFormat;
-        protected Text spo_table;
-        private Text po_table;
-        private Text osp_table;
-        private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
-
-        public TextToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(final Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (prefix != null) {
-                RdfCloudTripleStoreConstants.prefixTables(prefix);
-            }
-
-            spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-
-            rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString()));
-            parser = Rio.createParser(rdfFormat);
-            RyaTripleContext tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
-            final RyaTableMutationsFactory mut = new RyaTableMutationsFactory(tripleContext);
-
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws RDFHandlerException {
-                    try {
-                        RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement);
-                        if(ryaStatement.getColumnVisibility() == null) {
-                            ryaStatement.setColumnVisibility(cv);
-                        }
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
-                                mut.serialize(ryaStatement);
-                        Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-                        Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-                        Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-                        for (Mutation m : spo) {
-                            context.write(spo_table, m);
-                        }
-                        for (Mutation m : po) {
-                            context.write(po_table, m);
-                        }
-                        for (Mutation m : osp) {
-                            context.write(osp_table, m);
-                        }
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException {
-
-                }
-            });
-        }
-
-        @Override
-        protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException {
-            try {
-                parser.parse(new StringReader(value.toString()), "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index 3d2fd78..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.mr.RyaStatementWritable;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> {
-
-    @Override
-    public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit,
-                                                                               TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new RdfFileRecordReader();
-    }
-
-    private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>();
-        int total = 0;
-		private RyaTripleContext tripleContext;
-        
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-            FileSplit fileSplit = (FileSplit) inputSplit;
-            Configuration conf = taskAttemptContext.getConfiguration();
-            String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
-            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-            tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
-
-            Path file = fileSplit.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
-            RDFParser rdfParser = Rio.createParser(rdfFormat);
-            rdfParser.setRDFHandler(this);
-            try {
-                rdfParser.parse(fileIn, "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            fileIn.close();
-            total = queue.size();
-            //TODO: Make this threaded so that you don't hold too many statements before sending them
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return queue.size() > 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException {
-            return queue.poll();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return ((float) (total - queue.size())) / ((float) total);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-        }
-
-        @Override
-        public void startRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void endRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleNamespace(String s, String s1) throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleStatement(Statement statement) throws RDFHandlerException {
-            queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), tripleContext));
-        }
-
-        @Override
-        public void handleComment(String s) throws RDFHandlerException {
-        }
-    }
-
-}