You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 12:50:27 UTC

[2/6] incubator-rya git commit: Consolidated MapReduce API and applications into toplevel project.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
new file mode 100644
index 0000000..cdc3235
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
@@ -0,0 +1,256 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.CompareToBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Basic {@link WritableComparable} for using Rya data with Hadoop.
+ * RyaStatementWritable wraps a {@link RyaStatement}, which in turn represents a
+ * statement as  a collection of {@link mvm.rya.api.domain.RyaURI} and
+ * {@link mvm.rya.api.domain.RyaType} objects.
+ * <p>
+ * This class is mutable, like all {@link org.apache.hadoop.io.Writable}s. When
+ * used as Mapper or Reducer input, the Hadoop framework will typically reuse
+ * the same object to load the next record. However, loading the next record
+ * will create a new RyaStatement internally. Therefore, if a statement must be
+ * stored for any length of time, be sure to extract the internal RyaStatement.
+ */
+public class RyaStatementWritable implements WritableComparable<RyaStatementWritable> {
+    private RyaTripleContext ryaContext;
+    private RyaStatement ryaStatement;
+
+    /**
+     * Instantiates a RyaStatementWritable with the default RyaTripleContext.
+     * @param conf  Unused.
+     */
+    public RyaStatementWritable(Configuration conf) {
+        this();
+    }
+    /**
+     * Instantiates a RyaStatementWritable with a given context.
+     * @param ryaContext    Context used for reading and writing the statement.
+     */
+    public RyaStatementWritable(RyaTripleContext ryaContext) {
+        this.ryaContext = ryaContext;
+    }
+    /**
+     * Instantiates a RyaStatementWritable with the default RyaTripleContext.
+     */
+    public RyaStatementWritable() {
+        this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
+    }
+    /**
+     * Instantiates a RyaStatementWritable with a given statement and context.
+     * @param ryaStatement  The statement (triple) represented by this object.
+     * @param ryaContext    Context used for reading and writing the statement.
+     */
+    public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) {
+        this(ryaContext);
+        this.ryaStatement = ryaStatement;
+    }
+
+    /**
+     * Gets the contained RyaStatement.
+     * @return The statement represented by this RyaStatementWritable.
+     */
+    public RyaStatement getRyaStatement() {
+        return ryaStatement;
+    }
+    /**
+     * Sets the contained RyaStatement.
+     * @param   ryaStatement    The statement to be represented by this
+     *                          RyaStatementWritable.
+     */
+    public void setRyaStatement(RyaStatement ryaStatement) {
+        this.ryaStatement = ryaStatement;
+    }
+
+    /**
+     * Comparison method for natural ordering. Compares based on the logical
+     * triple (the s/p/o/context information in the underlying RyaStatement)
+     * and then by the metadata contained in the RyaStatement if the triples are
+     * the same.
+     * @return  Zero if both RyaStatementWritables contain equivalent statements
+     *          or both have null statements; otherwise, an integer whose sign
+     *          corresponds to a consistent ordering.
+     */
+    @Override
+    public int compareTo(RyaStatementWritable other) {
+        CompareToBuilder builder = new CompareToBuilder();
+        RyaStatement rsThis = this.getRyaStatement();
+        RyaStatement rsOther = other.getRyaStatement(); // should throw NPE if other is null, as per Comparable contract
+        builder.append(rsThis == null, rsOther == null);
+        if (rsThis != null && rsOther != null) {
+            builder.append(rsThis.getSubject(), rsOther.getSubject());
+            builder.append(rsThis.getPredicate(), rsOther.getPredicate());
+            builder.append(rsThis.getObject(), rsOther.getObject());
+            builder.append(rsThis.getContext(), rsOther.getContext());
+            builder.append(rsThis.getQualifer(), rsOther.getQualifer());
+            builder.append(rsThis.getColumnVisibility(), rsOther.getColumnVisibility());
+            builder.append(rsThis.getValue(), rsOther.getValue());
+            builder.append(rsThis.getTimestamp(), rsOther.getTimestamp());
+        }
+        return builder.toComparison();
+    }
+
+    /**
+     * Returns a hash based on the hashCode method in RyaStatement.
+     * @return  A hash that should be consistent for equivalent RyaStatements.
+     */
+    @Override
+    public int hashCode() {
+        if (ryaStatement == null) {
+            return 0;
+        }
+        return ryaStatement.hashCode();
+    }
+
+    /**
+     * Tests for equality using the equals method in RyaStatement.
+     * @param   o   Object to compare with
+     * @return  true if both objects are RyaStatementWritables containing
+     *          equivalent RyaStatements.
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (o == null || !(o instanceof RyaStatementWritable)) {
+            return false;
+        }
+        RyaStatement rsThis = this.getRyaStatement();
+        RyaStatement rsOther = ((RyaStatementWritable) o).getRyaStatement();
+        if (rsThis == null) {
+            return rsOther == null;
+        }
+        else {
+            return rsThis.equals(rsOther);
+        }
+    }
+
+    /**
+     * Serializes this RyaStatementWritable.
+     * @param   dataOutput  An output stream for serialized statement data.
+     * @throws  IOException if the RyaStatement is null or otherwise can't be
+     *          serialized.
+     */
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        if (ryaStatement == null) {
+            throw new IOException("Rya Statement is null");
+        }
+        try {
+            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(ryaStatement);
+            TripleRow tripleRow = map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+            byte[] row = tripleRow.getRow();
+            byte[] columnFamily = tripleRow.getColumnFamily();
+            byte[] columnQualifier = tripleRow.getColumnQualifier();
+            write(dataOutput, row);
+            write(dataOutput, columnFamily);
+            write(dataOutput, columnQualifier);
+            write(dataOutput, ryaStatement.getColumnVisibility());
+            write(dataOutput, ryaStatement.getValue());
+            Long timestamp = ryaStatement.getTimestamp();
+            boolean b = timestamp != null;
+            dataOutput.writeBoolean(b);
+            if (b) {
+                dataOutput.writeLong(timestamp);
+            }
+        } catch (TripleRowResolverException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Write part of a statement to an output stream.
+     * @param dataOutput Stream for writing serialized statements.
+     * @param row   Individual field to write, as a byte array.
+     * @throws IOException if writing to the stream fails.
+     */
+    protected void write(DataOutput dataOutput, byte[] row) throws IOException {
+        boolean b = row != null;
+        dataOutput.writeBoolean(b);
+        if (b) {
+            dataOutput.writeInt(row.length);
+            dataOutput.write(row);
+        }
+    }
+
+    /**
+     * Read part of a statement from an input stream.
+     * @param dataInput Stream for reading serialized statements.
+     * @return The next individual field, as a byte array.
+     * @throws IOException if reading from the stream fails.
+     */
+    protected byte[] read(DataInput dataInput) throws IOException {
+        if (dataInput.readBoolean()) {
+            int len = dataInput.readInt();
+            byte[] bytes = new byte[len];
+            dataInput.readFully(bytes);
+            return bytes;
+        }else {
+            return null;
+        }
+    }
+
+    /**
+     * Loads a RyaStatementWritable by reading data from an input stream.
+     * Creates a new RyaStatement and assigns it to this RyaStatementWritable.
+     * @param   dataInput   An stream containing serialized statement data.
+     */
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        byte[] row = read(dataInput);
+        byte[] columnFamily = read(dataInput);
+        byte[] columnQualifier = read(dataInput);
+        byte[] columnVisibility = read(dataInput);
+        byte[] value = read(dataInput);
+        boolean b = dataInput.readBoolean();
+        Long timestamp = null;
+        if (b) {
+            timestamp = dataInput.readLong();
+        }
+        try {
+            ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
+                    new TripleRow(row, columnFamily, columnQualifier));
+            ryaStatement.setColumnVisibility(columnVisibility);
+            ryaStatement.setValue(value);
+            ryaStatement.setTimestamp(timestamp);
+        } catch (TripleRowResolverException e) {
+            throw new IOException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
new file mode 100644
index 0000000..bc3af58
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
@@ -0,0 +1,196 @@
+package mvm.rya.accumulo.mr.examples;
+
+import java.io.BufferedReader;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.Date;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFWriter;
+import org.openrdf.rio.Rio;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+
+/**
+ * Example of using a MapReduce tool to get triples from a Rya instance and serialize them to a text file as RDF.
+ */
+public class TextOutputExample extends AbstractAccumuloMRTool {
+    private static Logger logger = Logger.getLogger(TextOutputExample.class);
+    private static RDFFormat rdfFormat = RDFFormat.NQUADS;
+    private static String tempDir;
+
+    // Connection information
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String INSTANCE_NAME = "instanceName";
+    private static final String PREFIX = "rya_example_";
+
+    public static void main(String[] args) throws Exception {
+        setUpRya();
+        TextOutputExample tool = new TextOutputExample();
+        ToolRunner.run(new Configuration(), tool, args);
+    }
+
+    static void setUpRya() throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+        MockInstance mock = new MockInstance(INSTANCE_NAME);
+        Connector conn = mock.getConnector(USERNAME, new PasswordToken(PASSWORD));
+        AccumuloRyaDAO dao = new AccumuloRyaDAO();
+        dao.setConnector(conn);
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(PREFIX);
+        dao.setConf(conf);
+        dao.init();
+        String ns = "http://example.com/";
+        dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p1"), new RyaURI(ns+"o1")));
+        dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p2"), new RyaURI(ns+"o2")));
+        dao.add(new RyaStatement(new RyaURI(ns+"s2"), new RyaURI(ns+"p1"), new RyaURI(ns+"o3"),
+                new RyaURI(ns+"g1")));
+        dao.add(new RyaStatement(new RyaURI(ns+"s3"), new RyaURI(ns+"p3"), new RyaURI(ns+"o3"),
+                new RyaURI(ns+"g2")));
+        dao.destroy();
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        logger.info("Configuring tool to connect to mock instance...");
+        MRUtils.setACUserName(conf, USERNAME);
+        MRUtils.setACPwd(conf, PASSWORD);
+        MRUtils.setACInstance(conf, INSTANCE_NAME);
+        MRUtils.setACMock(conf, true);
+        MRUtils.setTablePrefix(conf, PREFIX);
+
+        logger.info("Initializing tool and checking configuration...");
+        init();
+
+        logger.info("Creating Job, setting Mapper class, and setting no Reducer...");
+        Job job = Job.getInstance(conf);
+        job.setJarByClass(TextOutputExample.class);
+        job.setMapperClass(RyaToRdfMapper.class);
+        job.setNumReduceTasks(0);
+
+        logger.info("Configuring Job to take input from the mock Rya instance...");
+        setupRyaInput(job);
+
+        logger.info("Configuring Job to output Text to a new temporary directory...");
+        tempDir = Files.createTempDirectory("rya-mr-example").toString();
+        Path outputPath = new Path(tempDir, "rdf-output");
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputPath);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+        Date start = new Date();
+        logger.info("Starting Job at: start");
+        boolean success = job.waitForCompletion(true);
+
+        if (!success) {
+            System.out.println("Job Failed!!!");
+            return 1;
+        }
+
+        Date end = new Date();
+        logger.info("Job ended: " + end);
+        logger.info("The job took " + (end.getTime() - start.getTime()) / 1000 + " seconds.");
+        // Print output and then delete temp files:
+        java.nio.file.Path tempPath = FileSystems.getDefault().getPath(tempDir);
+        for (java.nio.file.Path subdir : Files.newDirectoryStream(tempPath)) {
+            logger.info("");
+            logger.info("Output files:");
+            for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) {
+                logger.info("\t" + outputFile);
+            }
+            for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir, "part*")) {
+                logger.info("");
+                logger.info("Contents of " + outputFile + ":");
+                BufferedReader reader = Files.newBufferedReader(outputFile, Charset.defaultCharset());
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    logger.info("\t" + line);
+                }
+                reader.close();
+            }
+            for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) {
+                Files.deleteIfExists(outputFile);
+            }
+            Files.deleteIfExists(subdir);
+        }
+        Files.deleteIfExists(tempPath);
+        logger.info("");
+        logger.info("Temporary directory " + tempDir + " deleted.");
+
+        return 0;
+    }
+
+    static class RyaToRdfMapper extends Mapper<Text, RyaStatementWritable, NullWritable, Text> {
+        Text textOut = new Text();
+        @Override
+        protected void map(Text key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
+            // receives a RyaStatementWritable; convert to a Statement
+            RyaStatement rstmt = value.getRyaStatement();
+            Statement st = RyaToRdfConversions.convertStatement(rstmt);
+            logger.info("Mapper receives: " + rstmt);
+            // then convert to an RDF string
+            StringWriter writer = new StringWriter();
+            try {
+                RDFWriter rdfWriter = Rio.createWriter(rdfFormat, writer);
+                rdfWriter.startRDF();
+                rdfWriter.handleStatement(st);
+                rdfWriter.endRDF();
+            } catch (RDFHandlerException e) {
+                throw new IOException("Error writing RDF data", e);
+            }
+            // Write the string to the output
+            String line = writer.toString().trim();
+            logger.info("Serialized to RDF: " + line);
+            textOut.set(line);
+            context.write(NullWritable.get(), textOut);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
new file mode 100644
index 0000000..ee4e00b
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
@@ -0,0 +1,258 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import java.io.IOException;
+import java.util.Date;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+/**
+ * Count subject, predicate, object. Save in table
+ * Class RdfCloudTripleStoreCountTool
+ * Date: Apr 12, 2011
+ * Time: 10:39:40 AM
+ * @deprecated
+ */
+public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool {
+
+    public static void main(String[] args) {
+        try {
+
+            ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * cloudbase props
+     */
+
+    @Override
+    public int run(String[] strings) throws Exception {
+        conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
+
+        //initialize
+        init();
+
+        Job job = new Job(conf);
+        job.setJarByClass(AccumuloRdfCountTool.class);
+        setupAccumuloInput(job);
+
+        AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(LongWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+
+        // set mapper and reducer classes
+        job.setMapperClass(CountPiecesMapper.class);
+        job.setCombinerClass(CountPiecesCombiner.class);
+        job.setReducerClass(CountPiecesReducer.class);
+
+        String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
+        setupAccumuloOutput(job, outputTable);
+
+        // Submit the job
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            return 0;
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+
+        return -1;
+    }
+
+    public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> {
+
+        public static final byte[] EMPTY_BYTES = new byte[0];
+        private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+
+        ValueFactoryImpl vf = new ValueFactoryImpl();
+
+        private Text keyOut = new Text();
+        private LongWritable valOut = new LongWritable(1);
+        private RyaTripleContext ryaContext;
+
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+            tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+                    conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+            ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
+        }
+
+        @Override
+        protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+            try {
+                RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
+                //count each piece subject, pred, object
+
+                String subj = statement.getSubject().getData();
+                String pred = statement.getPredicate().getData();
+//                byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject());
+                RyaURI scontext = statement.getContext();
+                boolean includesContext = scontext != null;
+                String scontext_str = (includesContext) ? scontext.getData() : null;
+
+                ByteArrayDataOutput output = ByteStreams.newDataOutput();
+                output.writeUTF(subj);
+                output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+
+                output = ByteStreams.newDataOutput();
+                output.writeUTF(pred);
+                output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
+                output.writeBoolean(includesContext);
+                if (includesContext)
+                    output.writeUTF(scontext_str);
+                keyOut.set(output.toByteArray());
+                context.write(keyOut, valOut);
+            } catch (TripleRowResolverException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+        private LongWritable valOut = new LongWritable();
+
+        // TODO: can still add up to be large I guess
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 2;
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            valOut.set(count);
+            context.write(key, valOut);
+        }
+
+    }
+
+    public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> {
+
+        Text row = new Text();
+        Text cat_txt = new Text();
+        Value v_out = new Value();
+        ValueFactory vf = new ValueFactoryImpl();
+
+        // any count lower than this does not need to be saved
+        public static final int TOO_LOW = 10;
+        private String tablePrefix;
+        protected Text table;
+        private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
+
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+            tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+            table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+            final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP);
+            if (cv_s != null)
+                cv = new ColumnVisibility(cv_s);
+        }
+
+        @Override
+        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+            long count = 0;
+            for (LongWritable lw : values) {
+                count += lw.get();
+            }
+
+            if (count <= TOO_LOW)
+                return;
+
+            ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
+            String v = badi.readUTF();
+            cat_txt.set(badi.readUTF());
+
+            Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
+            boolean includesContext = badi.readBoolean();
+            if (includesContext) {
+                columnQualifier = new Text(badi.readUTF());
+            }
+
+            row.set(v);
+            Mutation m = new Mutation(row);
+            v_out.set((count + "").getBytes());
+            m.put(cat_txt, columnQualifier, cv, v_out);
+            context.write(table, m);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
new file mode 100644
index 0000000..7857e45
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
@@ -0,0 +1,91 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.rio.RDFFormat;
+
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+
+/**
+ * Reads RDF data from one or more file(s) and inserts statements into Rya.
+ * <p>
+ * Uses {@link mvm.rya.accumulo.mr.RdfFileInputFormat} to read data.
+ * <p>
+ * Takes one argument: the file or directory to read (from HDFS).
+ * <p>
+ * Expects configuration:
+ * <p>
+ * - RDF format, named by parameter "rdf.format"; see {@link RDFFormat}.
+ *   Defaults to rdf/xml. If using multiple files, all must be the same format.
+ * <p>
+ * - Accumulo and Rya configuration parameters as named in {@link MRUtils}
+ *   (username, password, instance name, zookeepers, and Rya prefix)
+ * <p>
+ * - Indexing configuration parameters as named in
+ *   {@link mvm.rya.indexing.accumulo.ConfigUtils} (enable or disable freetext,
+ *   geo, temporal, and entity indexing, and specify predicates for each
+ *   indexer). If not given, no secondary indexing is done.
+ */
+public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool {
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        init();
+        Job job = Job.getInstance(conf, "Rdf File Input");
+        job.setJarByClass(RdfFileInputTool.class);
+
+        String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]);
+        setupFileInput(job, inputPath, RDFFormat.RDFXML);
+        setupRyaOutput(job);
+        job.setNumReduceTasks(0);
+
+        Date startTime = new Date();
+        System.out.println("Job started: " + startTime);
+        int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+        if (exitCode == 0) {
+            Date end_time = new Date();
+            System.out.println("Job ended: " + end_time);
+            System.out.println("The job took "
+                    + (end_time.getTime() - startTime.getTime()) / 1000
+                    + " seconds.");
+            long n = job.getCounters()
+                    .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue();
+            System.out.println(n + " statement(s) inserted to Rya.");
+        } else {
+            System.out.println("Job Failed!!!");
+        }
+        return exitCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
new file mode 100644
index 0000000..d713d85
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
@@ -0,0 +1,241 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.calrissian.mango.types.LexiTypeEncoders;
+import org.calrissian.mango.types.TypeEncoder;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+
+/**
+ */
+public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool {
+    @Override
+    public int run(String[] strings) throws Exception {
+        conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
+        //faster
+        init();
+
+        Job job = new Job(conf);
+        job.setJarByClass(Upgrade322Tool.class);
+
+        setupAccumuloInput(job);
+        AccumuloInputFormat.setInputTableName(job, MRUtils.getTablePrefix(conf) + TBL_OSP_SUFFIX);
+
+        //we do not need to change any row that is a string, custom, or uri type
+        IteratorSetting regex = new IteratorSetting(30, "regex",
+                                                    RegExFilter.class);
+        RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false);
+        RegExFilter.setNegate(regex, true);
+
+        // set input output of the particular job
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Mutation.class);
+
+        setupAccumuloOutput(job, MRUtils.getTablePrefix(conf) +
+                               TBL_SPO_SUFFIX);
+
+        // set mapper and reducer classes
+        job.setMapperClass(Upgrade322Mapper.class);
+        job.setReducerClass(Reducer.class);
+
+        // Submit the job
+        return job.waitForCompletion(true) ? 0 : 1;
+    }
+
+    public static void main(String[] args) {
+        try {
+            ToolRunner.run(new Configuration(), new Upgrade322Tool(), args);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Reading from the OSP table
+     */
+    public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> {
+
+        private String tablePrefix;
+        private Text spoTable;
+        private Text poTable;
+        private Text ospTable;
+
+        private final UpgradeObjectSerialization upgradeObjectSerialization;
+
+        public Upgrade322Mapper() {
+            this(new UpgradeObjectSerialization());
+        }
+
+        public Upgrade322Mapper(
+          UpgradeObjectSerialization upgradeObjectSerialization) {
+            this.upgradeObjectSerialization = upgradeObjectSerialization;
+        }
+
+        @Override
+        protected void setup(
+          Context context) throws IOException, InterruptedException {
+            super.setup(context);
+
+            tablePrefix = context.getConfiguration().get(
+              MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
+            spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
+            poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
+            ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
+        }
+
+        @Override
+        protected void map(
+          Key key, Value value, Context context)
+          throws IOException, InterruptedException {
+
+            //read the key, expect OSP
+            final String row = key.getRow().toString();
+            final int firstDelim = row.indexOf(DELIM);
+            final int secondDelim = row.indexOf(DELIM, firstDelim + 1);
+            final int typeDelim = row.lastIndexOf(TYPE_DELIM);
+            final String oldSerialization = row.substring(0, firstDelim);
+            char typeMarker = row.charAt(row.length() - 1);
+
+            final String subject = row.substring(firstDelim + 1, secondDelim);
+            final String predicate = row.substring(secondDelim + 1, typeDelim);
+            final String typeSuffix = TYPE_DELIM + typeMarker;
+
+            String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker);
+            if(newSerialization == null) {
+                return;
+            }
+
+            //write out delete Mutations
+            Mutation deleteOldSerialization_osp = new Mutation(key.getRow());
+            deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+                               key.getColumnVisibilityParsed());
+            Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix);
+            deleteOldSerialization_po.putDelete(key.getColumnFamily(),
+                                                key.getColumnQualifier(),
+                                                key.getColumnVisibilityParsed());
+            Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix);
+            deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+                                                key.getColumnVisibilityParsed());
+
+            //write out new serialization
+            Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix);
+            putNewSerialization_osp.put(key.getColumnFamily(),
+                                        key.getColumnQualifier(),
+                                        key.getColumnVisibilityParsed(),
+                                        key.getTimestamp(), value);
+            Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix);
+            putNewSerialization_po.put(key.getColumnFamily(),
+                                       key.getColumnQualifier(),
+                                       key.getColumnVisibilityParsed(),
+                                       key.getTimestamp(), value);
+            Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix);
+            putNewSerialization_spo.put(key.getColumnFamily(),
+                                        key.getColumnQualifier(),
+                                        key.getColumnVisibilityParsed(),
+                                        key.getTimestamp(), value);
+
+            //write out deletes to all tables
+            context.write(ospTable, deleteOldSerialization_osp);
+            context.write(poTable, deleteOldSerialization_po);
+            context.write(spoTable, deleteOldSerialization_spo);
+
+            //write out inserts to all tables
+            context.write(ospTable, putNewSerialization_osp);
+            context.write(poTable, putNewSerialization_po);
+            context.write(spoTable, putNewSerialization_spo);
+        }
+    }
+
+    public static class UpgradeObjectSerialization {
+
+        public static final TypeEncoder<Boolean, String>
+          BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder();
+        public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER
+          = LexiTypeEncoders.byteEncoder();
+        public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER
+          = LexiTypeEncoders.dateEncoder();
+        public static final TypeEncoder<Integer, String>
+          INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder();
+        public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER
+          = LexiTypeEncoders.longEncoder();
+        public static final TypeEncoder<Double, String>
+          DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder();
+
+        public String upgrade(String object, int typeMarker) {
+            switch(typeMarker) {
+                case 10: //boolean
+                    final boolean bool = Boolean.parseBoolean(object);
+                    return BOOLEAN_STRING_TYPE_ENCODER.encode(bool);
+                case 9: //byte
+                    final byte b = Byte.parseByte(object);
+                    return BYTE_STRING_TYPE_ENCODER.encode(b);
+                case 4: //long
+                    final Long lng = Long.parseLong(object);
+                    return LONG_STRING_TYPE_ENCODER.encode(lng);
+                case 5: //int
+                    final Integer i = Integer.parseInt(object);
+                    return INTEGER_STRING_TYPE_ENCODER.encode(i);
+                case 6: //double
+                    String exp = object.substring(2, 5);
+                    char valueSign = object.charAt(0);
+                    char expSign = object.charAt(1);
+                    Integer expInt = Integer.parseInt(exp);
+                    if (expSign == '-') {
+                        expInt = 999 - expInt;
+                    }
+                    final String expDoubleStr =
+                      String.format("%s%sE%s%d", valueSign,
+                                    object.substring(6),
+                                    expSign, expInt);
+                    return DOUBLE_STRING_TYPE_ENCODER
+                      .encode(Double.parseDouble(expDoubleStr));
+                case 7: //datetime
+                    //check to see if it is an early release that includes the exact term xsd:dateTime
+                    final Long l = Long.MAX_VALUE - Long.parseLong(object);
+                    Date date = new Date(l);
+                    return DATE_STRING_TYPE_ENCODER.encode(date);
+                default:
+                    return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
new file mode 100644
index 0000000..cda66bd
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
@@ -0,0 +1,180 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.rio.RDFFormat;
+
+import mvm.rya.api.resolver.RyaToRdfConversions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RdfFileInputFormatTest {
+    static String NT_INPUT = "src/test/resources/test.ntriples";
+    static String TRIG_INPUT = "src/test/resources/namedgraphs.trig";
+
+    Configuration conf;
+    Job job;
+    FileSystem fs;
+    RdfFileInputFormat.RdfFileRecordReader reader;
+
+    @Rule
+    public ExpectedException expected = ExpectedException.none();
+
+    @Before
+    public void before() throws IOException {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        fs = FileSystem.get(conf);
+        job = Job.getInstance(conf);
+    }
+
+    void init(String filename) throws IOException, InterruptedException {
+        conf = job.getConfiguration();
+        File inputFile = new File(filename);
+        Path inputPath = new Path(inputFile.getAbsoluteFile().toURI());
+        InputSplit split = new FileSplit(inputPath, 0, inputFile.length(), null);
+        TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+        reader = (RdfFileInputFormat.RdfFileRecordReader) new RdfFileInputFormat().createRecordReader(split, context);
+        reader.initialize(split, context);
+    }
+
+    @Test
+    public void testStatementInput() throws Exception {
+        RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES);
+        init(NT_INPUT);
+        String prefix = "urn:lubm:rdfts#";
+        URI[] gs = {
+                new URIImpl(prefix + "GraduateStudent01"),
+                new URIImpl(prefix + "GraduateStudent02"),
+                new URIImpl(prefix + "GraduateStudent03"),
+                new URIImpl(prefix + "GraduateStudent04")
+        };
+        URI hasFriend = new URIImpl(prefix + "hasFriend");
+        Statement[] statements = {
+                new StatementImpl(gs[0], hasFriend, gs[1]),
+                new StatementImpl(gs[1], hasFriend, gs[2]),
+                new StatementImpl(gs[2], hasFriend, gs[3])
+        };
+        int count = 0;
+        while (reader.nextKeyValue()) {
+            Assert.assertEquals(statements[count],
+                    RyaToRdfConversions.convertStatement(reader.getCurrentValue().getRyaStatement()));
+            count++;
+            Assert.assertEquals(count, reader.getCurrentKey().get());
+        }
+        Assert.assertEquals(3, count);
+    }
+
+    @Test
+    public void testTrigInput() throws Exception {
+        RdfFileInputFormat.setRDFFormat(job, RDFFormat.TRIG);
+        init(TRIG_INPUT);
+        Assert.assertTrue(reader.nextKeyValue());
+        Assert.assertEquals(1, reader.getCurrentKey().get());
+        Statement expected = new ContextStatementImpl(
+            new URIImpl("http://www.example.org/exampleDocument#Monica"),
+            new URIImpl("http://www.example.org/vocabulary#name"),
+            new LiteralImpl("Monica Murphy"),
+            new URIImpl("http://www.example.org/exampleDocument#G1"));
+        Statement actual = RyaToRdfConversions.convertStatement(
+            reader.getCurrentValue().getRyaStatement());
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testBlockStatementQueue() throws Exception {
+        RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES);
+        RdfFileInputFormat.setStatementBufferSize(job, 2);
+        init(NT_INPUT);
+        // 3 statements in total, plus done signal: should fill up three times
+        int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill
+        int maxSeconds = 120; // timeout that should never be reached
+        for (int i = 0; i < 3; i++) {
+            long t = 0;
+            while (reader.statementCache.remainingCapacity() > 0) {
+                if (t >= (maxSeconds*1000)) {
+                    Assert.fail("Statement buffer still hasn't filled up after " + maxSeconds + " seconds.");
+                }
+                Assert.assertTrue(reader.statementCache.size() <= 2);
+                Thread.sleep(interval);
+                t += interval;
+            }
+            Assert.assertEquals(2, reader.statementCache.size());
+            Assert.assertEquals(0, reader.statementCache.remainingCapacity());
+            Assert.assertTrue(reader.nextKeyValue());
+        }
+        // Then the only thing in the queue should be the done signal
+        Assert.assertSame(RdfFileInputFormat.DONE, reader.statementCache.peek());
+        Assert.assertEquals(1, reader.statementCache.size());
+        Assert.assertFalse(reader.nextKeyValue());
+        Assert.assertTrue(reader.statementCache.isEmpty());
+    }
+
+    @Test
+    public void testFailGracefully() throws Exception {
+        // Pass the wrong RDF format and make sure all threads terminate
+        int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill
+        int maxSeconds = 60; // timeout that should never be reached
+        RdfFileInputFormat.setRDFFormat(job, RDFFormat.RDFXML);
+        RdfFileInputFormat.setTimeout(job, maxSeconds*2);
+        init(NT_INPUT);
+        long t = 0;
+        while (reader.statementCache.isEmpty()) {
+            if (t >= (maxSeconds*1000)) {
+                Assert.fail("Statement buffer still hasn't been sent error signal after " + maxSeconds + " seconds.");
+            }
+            Thread.sleep(interval);
+            t += interval;
+        }
+        Assert.assertSame(RdfFileInputFormat.ERROR, reader.statementCache.peek());
+        expected.expect(IOException.class);
+        try {
+            Assert.assertFalse(reader.nextKeyValue());
+        }
+        catch (Exception e) {
+            Assert.assertNull(reader.getCurrentKey());
+            Assert.assertNull(reader.getCurrentValue());
+            Assert.assertFalse(reader.readerThread.isAlive());
+            Assert.assertFalse(reader.parserThread.isAlive());
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
new file mode 100644
index 0000000..2755732
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
@@ -0,0 +1,156 @@
+package mvm.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.RyaTableMutationsFactory;
+import mvm.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class RyaInputFormatTest {
+
+    static String username = "root", table = "rya_spo";
+    static PasswordToken password = new PasswordToken("");
+
+    static Instance instance;
+    static AccumuloRyaDAO apiImpl;
+
+    @BeforeClass
+    public static void init() throws Exception {
+        instance = new MockInstance(RyaInputFormatTest.class.getName() + ".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+    }
+
+    @Before
+    public void before() throws Exception {
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @Test
+    public void testInputFormat() throws Exception {
+
+
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        RyaInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+        RyaInputFormat.setConnectorInfo(jobConf, username, password);
+        RyaInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+
+        AccumuloInputFormat.setInputTableName(jobConf, table);
+        AccumuloInputFormat.setInputTableName(jobConf, table);
+        AccumuloInputFormat.setScanIsolation(jobConf, false);
+        AccumuloInputFormat.setLocalIterators(jobConf, false);
+        AccumuloInputFormat.setOfflineTableScan(jobConf, false);
+
+        RyaInputFormat inputFormat = new RyaInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+        RecordReader<Text, RyaStatementWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+        RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<RyaStatement> results = new ArrayList<RyaStatement>();
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue();
+            RyaStatement value = writable.getRyaStatement();
+            Text text = ryaStatementRecordReader.getCurrentKey();
+            RyaStatement stmt = RyaStatement.builder()
+                .setSubject(value.getSubject())
+                .setPredicate(value.getPredicate())
+                .setObject(value.getObject())
+                .setContext(value.getContext())
+                .setQualifier(value.getQualifer())
+                .setColumnVisibility(value.getColumnVisibility())
+                .setValue(value.getValue())
+                .build();
+            results.add(stmt);
+
+            System.out.println(text);
+            System.out.println(value);
+        }
+
+        Assert.assertTrue(results.size() == 2);
+        Assert.assertTrue(results.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
new file mode 100644
index 0000000..a48afa3
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
@@ -0,0 +1,324 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.indexing.StatementConstraints;
+import mvm.rya.indexing.TemporalInstant;
+import mvm.rya.indexing.TemporalInstantRfc3339;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.SimpleTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoConstants;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RyaOutputFormatTest {
+    private static final String CV = "test_auth";
+    private static final String GRAPH = "http://example.org/graph";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String INSTANCE_NAME = RyaOutputFormatTest.class.getSimpleName() + ".rya_output";
+    private static final String PREFIX = "ryaoutputformattest_";
+
+    MockInstance instance;
+    Connector connector;
+    AccumuloRyaDAO dao;
+    AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+    Job job;
+    RyaTripleContext ryaContext;
+
+    @Before
+    public void init() throws Exception {
+        MRUtils.setACMock(conf, true);
+        MRUtils.setACInstance(conf, INSTANCE_NAME);
+        MRUtils.setACUserName(conf, USERNAME);
+        MRUtils.setACPwd(conf, PASSWORD);
+        MRUtils.setTablePrefix(conf, PREFIX);
+        conf.setTablePrefix(PREFIX);
+        conf.setAuths(CV);
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, INSTANCE_NAME);
+        conf.set(ConfigUtils.CLOUDBASE_USER, USERNAME);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, PASSWORD);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+        conf.setClass(ConfigUtils.TOKENIZER_CLASS, SimpleTokenizer.class, Tokenizer.class);
+        ryaContext = RyaTripleContext.getInstance(conf);
+        instance = new MockInstance(INSTANCE_NAME);
+        connector = instance.getConnector(USERNAME, new PasswordToken(PASSWORD));
+        job = Job.getInstance(conf);
+        RyaOutputFormat.setMockInstance(job, instance.getInstanceName());
+        AccumuloOutputFormat.setConnectorInfo(job, USERNAME, new PasswordToken(PASSWORD));
+        AccumuloOutputFormat.setCreateTables(job, true);
+        AccumuloOutputFormat.setDefaultTableName(job, PREFIX + "default");
+        RyaOutputFormat.setTablePrefix(job, PREFIX);
+    }
+
+    private void write(RyaStatement... input) throws IOException, InterruptedException {
+        RecordWriter<Writable, RyaStatementWritable> writer =
+                new RyaOutputFormat.RyaRecordWriter(job.getConfiguration());
+        for (RyaStatement rstmt : input) {
+            RyaStatementWritable rsw = new RyaStatementWritable(rstmt, ryaContext);
+            writer.write(new Text("unused"), rsw);
+        }
+        writer.close(null);
+    }
+
+    @Test
+    public void testOutputFormat() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setColumnVisibility(CV.getBytes())
+            .setValue(new byte[0])
+            .setContext(new RyaURI(GRAPH))
+            .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, true);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        write(input);
+        TestUtils.verify(connector, conf, input);
+    }
+
+    @Test
+    public void testDefaultCV() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setValue(new byte[0])
+            .setContext(new RyaURI(GRAPH))
+            .build();
+        RyaStatement expected = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setValue(new byte[0])
+            .setContext(new RyaURI(GRAPH))
+            .setColumnVisibility(CV.getBytes())
+            .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, true);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        RyaOutputFormat.setDefaultVisibility(job, CV);
+        write(input);
+        TestUtils.verify(connector, conf, expected);
+    }
+
+    @Test
+    public void testDefaultGraph() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setValue(new byte[0])
+            .setColumnVisibility(CV.getBytes())
+            .build();
+        RyaStatement expected = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com"))
+            .setPredicate(new RyaURI("http://some_other_uri"))
+            .setObject(new RyaURI("http://www.yahoo.com"))
+            .setValue(new byte[0])
+            .setColumnVisibility(CV.getBytes())
+            .setContext(new RyaURI(GRAPH))
+            .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, true);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        RyaOutputFormat.setDefaultContext(job, GRAPH);
+        write(input);
+        TestUtils.verify(connector, conf, expected);
+    }
+
+    @Test
+    public void testFreeTextIndexing() throws Exception {
+        AccumuloFreeTextIndexer ft = new AccumuloFreeTextIndexer();
+        ft.setConf(conf);
+        RyaStatement input = RyaStatement.builder()
+                .setSubject(new RyaURI(GRAPH + ":s"))
+                .setPredicate(new RyaURI(GRAPH + ":p"))
+                .setObject(new RyaType(XMLSchema.STRING, "one two three four five"))
+                .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, false);
+        RyaOutputFormat.setFreeTextEnabled(job, true);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        write(input);
+        Set<Statement> empty = new HashSet<>();
+        Set<Statement> expected = new HashSet<>();
+        expected.add(RyaToRdfConversions.convertStatement(input));
+        Assert.assertEquals(expected, getSet(ft.queryText("one", new StatementConstraints())));
+        Assert.assertEquals(empty, getSet(ft.queryText("!two", new StatementConstraints())));
+        Assert.assertEquals(expected, getSet(ft.queryText("*r", new StatementConstraints())));
+        Assert.assertEquals(empty, getSet(ft.queryText("r*", new StatementConstraints())));
+        Assert.assertEquals(expected, getSet(ft.queryText("!r*", new StatementConstraints())));
+        Assert.assertEquals(expected, getSet(ft.queryText("t* & !s*", new StatementConstraints())));
+        ft.close();
+    }
+
+    @Test
+    public void testTemporalIndexing() throws Exception {
+        TemporalInstant[] instants = {
+                new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 01),
+                new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 02),
+                new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03),
+                new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03)
+        };
+        Statement[] statements = new Statement[instants.length];
+        RyaOutputFormat.setCoreTablesEnabled(job, false);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, true);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        ValueFactory vf = new ValueFactoryImpl();
+        for (int i = 0; i < instants.length; i++) {
+            RyaType time = RdfToRyaConversions.convertLiteral(vf.createLiteral(instants[i].toString()));
+            RyaStatement input = RyaStatement.builder()
+                    .setSubject(new RyaURI(GRAPH + ":s"))
+                    .setPredicate(new RyaURI(GRAPH + ":p"))
+                    .setObject(time)
+                    .build();
+            write(input);
+            statements[i] = RyaToRdfConversions.convertStatement(input);
+        }
+        AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
+        temporal.setConf(conf);
+        Set<Statement> empty = new HashSet<>();
+        Set<Statement> head = new HashSet<>();
+        Set<Statement> tail = new HashSet<>();
+        head.add(statements[0]);
+        tail.add(statements[2]);
+        tail.add(statements[3]);
+        Assert.assertEquals(empty, getSet(temporal.queryInstantBeforeInstant(instants[0], new StatementConstraints())));
+        Assert.assertEquals(empty, getSet(temporal.queryInstantAfterInstant(instants[3], new StatementConstraints())));
+        Assert.assertEquals(head, getSet(temporal.queryInstantBeforeInstant(instants[1], new StatementConstraints())));
+        Assert.assertEquals(tail, getSet(temporal.queryInstantAfterInstant(instants[1], new StatementConstraints())));
+        temporal.close();
+    }
+
+    @Test
+    public void testGeoIndexing() throws Exception {
+        GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+        Point p1 = gf.createPoint(new Coordinate(1, 1));
+        Point p2 = gf.createPoint(new Coordinate(2, 2));
+        GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer();
+        geo.setConf(conf);
+        RyaStatement input = RyaStatement.builder()
+                .setSubject(new RyaURI(GRAPH + ":s"))
+                .setPredicate(new RyaURI(GRAPH + ":p"))
+                .setObject(new RyaType(GeoConstants.XMLSCHEMA_OGC_WKT, "Point(2 2)"))
+                .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, false);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, true);
+        RyaOutputFormat.setEntityEnabled(job, false);
+        write(input);
+        Set<Statement> expected = new HashSet<>();
+        Assert.assertEquals(expected, getSet(geo.queryContains(p1, new StatementConstraints())));
+        expected.add(RyaToRdfConversions.convertStatement(input));
+        Assert.assertEquals(expected, getSet(geo.queryEquals(p2, new StatementConstraints())));
+        geo.close();
+    }
+
+    @Test
+    public void testEntityIndexing() throws Exception {
+        EntityCentricIndex entity = new EntityCentricIndex();
+        entity.setConf(conf);
+        RyaStatement input = RyaStatement.builder()
+                .setSubject(new RyaURI(GRAPH + ":s"))
+                .setPredicate(new RyaURI(GRAPH + ":p"))
+                .setObject(new RyaURI(GRAPH + ":o"))
+                .build();
+        RyaOutputFormat.setCoreTablesEnabled(job, false);
+        RyaOutputFormat.setFreeTextEnabled(job, false);
+        RyaOutputFormat.setTemporalEnabled(job, false);
+        RyaOutputFormat.setGeoEnabled(job, false);
+        RyaOutputFormat.setEntityEnabled(job, true);
+        write(input);
+        entity.close();
+        Set<Statement> expected = new HashSet<>();
+        Set<Statement> inserted = new HashSet<>();
+        expected.add(RyaToRdfConversions.convertStatement(input));
+        String table = ConfigUtils.getEntityTableName(conf);
+        Scanner scanner = connector.createScanner(table, new Authorizations(CV));
+        for (Map.Entry<Key, Value> row : scanner) {
+            System.out.println(row);
+            inserted.add(RyaToRdfConversions.convertStatement(
+                    EntityCentricIndex.deserializeStatement(row.getKey(), row.getValue())));
+        }
+        Assert.assertEquals(expected, inserted);
+    }
+
+    private static <X> Set<X> getSet(CloseableIteration<X, ?> iter) throws Exception {
+        Set<X> set = new HashSet<X>();
+        while (iter.hasNext()) {
+            set.add(iter.next());
+        }
+        return set;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
new file mode 100644
index 0000000..8bebdf4
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
@@ -0,0 +1,146 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.junit.Assert;
+import org.junit.Rule;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RyaStatementWritableTest {
+    private static final RyaURI s1 = new RyaURI(":s");
+    private static final RyaURI p1 = new RyaURI(":p");
+    private static final RyaType o1 = new RyaType(XMLSchema.INTEGER, "123");
+    private static final RyaURI s2 = new RyaURI(":s2");
+    private static final RyaURI p2 = new RyaURI(":p2");
+    private static final RyaType o2 = new RyaType(XMLSchema.STRING, "123");
+    private static final RyaURI graph1 = new RyaURI("http://example.org/graph1");
+    private static final RyaURI graph2 = new RyaURI("http://example.org/graph2");
+    private static final byte[] cv1 = "test_visibility".getBytes();
+    private static final long t1 = 1000;
+    private static final long t2 = 1001;
+    private static final RyaStatement rs1 = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+    // Equivalent:
+    private static final RyaStatement rs1b = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+    // Differ in one way each:
+    private static final RyaStatement rsGraph = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph2).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+    private static final RyaStatement rsCv = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build();
+    private static final RyaStatement rsQualifier = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph1).setColumnVisibility(cv1).setQualifier("q2").setTimestamp(t1).build();
+    private static final RyaStatement rsTimestamp = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+            .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t2).build();
+    // Different triple:
+    private static final RyaStatement rs2 = RyaStatement.builder().setSubject(s2).setPredicate(p2).setObject(o2)
+            .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build();
+
+    private static final RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
+
+    @Rule
+    public ExpectedException expected = ExpectedException.none();
+
+    @Test
+    public void testEquals() throws Exception {
+        RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+        RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null);
+        RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext);
+        RyaStatementWritable rswNull = new RyaStatementWritable(null, ryaContext);
+        Assert.assertEquals("Equivalent statements should be equal", rsw1, rsw1b);
+        Assert.assertFalse("equals(null) should be false", rsw1.equals(null));
+        Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rsw2);
+        Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rswNull);
+        Assert.assertNotEquals("Statements with different named graphs are not equal", rsw1,
+                new RyaStatementWritable(rsGraph, ryaContext));
+        Assert.assertNotEquals("Statements with different column visibilities are not equal", rsw1,
+                new RyaStatementWritable(rsCv, ryaContext));
+        Assert.assertNotEquals("Statements with different column qualifiers are not equal", rsw1,
+                new RyaStatementWritable(rsQualifier, ryaContext));
+        Assert.assertNotEquals("Statements with different timestamps are not equal", rsw1,
+                new RyaStatementWritable(rsTimestamp, ryaContext));
+    }
+
+    @Test
+    public void testCompareTo() throws Exception {
+        RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+        RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null);
+        RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, null);
+        RyaStatementWritable rswGraph = new RyaStatementWritable(rsCv, ryaContext);
+        RyaStatementWritable rswCv = new RyaStatementWritable(rsCv, ryaContext);
+        RyaStatementWritable rswQualifier = new RyaStatementWritable(rsQualifier, ryaContext);
+        RyaStatementWritable rswTimestamp = new RyaStatementWritable(rsTimestamp, ryaContext);
+        Assert.assertEquals("x.compareTo(x) should always return 0", 0, rsw1.compareTo(rsw1));
+        Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1.compareTo(rsw1b));
+        Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1b.compareTo(rsw1));
+        Assert.assertNotEquals("Statements with different named graphs are not equal", 0, rsw1.compareTo(rswGraph));
+        Assert.assertNotEquals("Statements with different column visibilities are not equal", 0, rsw1.compareTo(rswCv));
+        Assert.assertNotEquals("Statements with different column qualifiers are not equal", 0, rsw1.compareTo(rswQualifier));
+        Assert.assertNotEquals("Statements with different timestamps are not equal", 0, rsw1.compareTo(rswTimestamp));
+        Assert.assertEquals("compareTo in opposite directions should yield opposite signs",
+                Integer.signum(rsw1.compareTo(rsw2))*-1, Integer.signum(rsw2.compareTo(rsw1)));
+        // cycles shouldn't be possible; these comparisons can't all be negative or all be positive:
+        int x = Integer.signum(rsw1.compareTo(rsw2))
+                + Integer.signum(rsw2.compareTo(rsw1b))
+                + Integer.signum(rsw1b.compareTo(rsw1));
+        Assert.assertNotEquals("compareTo cycle detected", 3, Math.abs(x));
+        // compareTo(null) should always throw an exception:
+        expected.expect(NullPointerException.class);
+        rsw1.compareTo(null);
+    }
+
+    @Test
+    public void testSerializeDeserialize() throws Exception {
+        RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+        RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext);
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        DataOutputStream bytesOut = new DataOutputStream(bytes);
+        rsw1.write(bytesOut);
+        rsw2.write(bytesOut);
+        DataInputStream bytesIn = new DataInputStream(new ByteArrayInputStream(bytes.toByteArray()));
+        RyaStatementWritable deserialized = new RyaStatementWritable();
+        // Verify initial deserialization:
+        deserialized.readFields(bytesIn);
+        Assert.assertEquals("Deserialized statement not equal to original", rsw1, deserialized);
+        Assert.assertEquals("Deserialized statement has different hash code", rsw1.hashCode(), deserialized.hashCode());
+        Assert.assertEquals("original.compareTo(deserialized) should equal 0", 0, rsw1.compareTo(deserialized));
+        // Verify that a second read mutates the Writable object into the correct second record:
+        RyaStatement deserializedStatement = deserialized.getRyaStatement();
+        deserialized.readFields(bytesIn);
+        Assert.assertEquals("Deserialized statement not overwritten on second read", rsw2, deserialized);
+        // Verify that the internal RyaStatement object is recreated, not overwritten:
+        RyaStatement deserializedStatement2 = deserialized.getRyaStatement();
+        Assert.assertNotSame("Reading a second record should create a new internal RyaStatement",
+                deserializedStatement, deserializedStatement2);
+    }
+}
\ No newline at end of file