You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/08/04 02:10:29 UTC

[2/2] incubator-rya git commit: RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181

RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181

...through the Sail Layer and Rya DAO by queueing up multiple inserts at a time so can be written as a single batch.  If no statements in the batch have been written after a set time limit then they are flushed out into the datastore.  The size of the batch and the time limit are configurable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8def4cac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8def4cac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8def4cac

Branch: refs/heads/master
Commit: 8def4cacac7d0bb9ca3cc675c53d849261aa8029
Parents: fa2aad5
Author: eric.white <Er...@parsons.com>
Authored: Wed Jul 19 09:08:32 2017 -0400
Committer: Aaron Mihalik <aa...@gmail.com>
Committed: Thu Aug 3 15:45:05 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/rya/api/persist/RyaDAO.java |   7 +
 .../org/apache/rya/accumulo/AccumuloRyaDAO.java | 161 ++++++-------
 .../rya/mongodb/MongoDBRdfConfiguration.java    |  24 ++
 .../org/apache/rya/mongodb/MongoDBRyaDAO.java   |  63 ++++-
 .../rya/mongodb/batch/MongoDbBatchWriter.java   | 238 +++++++++++++++++++
 .../mongodb/batch/MongoDbBatchWriterConfig.java |  88 +++++++
 .../batch/MongoDbBatchWriterException.java      |  59 +++++
 .../mongodb/batch/MongoDbBatchWriterUtils.java  |  82 +++++++
 .../batch/collection/CollectionType.java        |  43 ++++
 .../batch/collection/DbCollectionType.java      |  53 +++++
 .../batch/collection/MongoCollectionType.java   |  52 ++++
 .../indexing/mongodb/AbstractMongoIndexer.java  |  60 ++++-
 .../rya/indexing/mongo/MongoEntityIndexIT.java  |  22 +-
 .../indexing/mongo/MongoIndexerDeleteIT.java    |  16 +-
 .../RdfCloudTripleStoreConnection.java          | 195 +++++++--------
 15 files changed, 965 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
index 57aae1b..d83a5e9 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
@@ -123,4 +123,11 @@ public interface RyaDAO<C extends RdfCloudTripleStoreConfiguration> extends RyaC
     public void purge(RdfCloudTripleStoreConfiguration configuration);
 
     public void dropAndDestroy() throws RyaDAOException;
+
+    /**
+     * Flushes any RyaStatements queued for insertion and writes them to the
+     * datastore.
+     * @throws RyaDAOException
+     */
+    public void flush() throws RyaDAOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
index bd7d2b3..f1f7c03 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -59,12 +59,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.openrdf.model.Namespace;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import info.aduna.iteration.CloseableIteration;
 import org.apache.rya.accumulo.experimental.AccumuloIndexer;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
@@ -76,6 +70,12 @@ import org.apache.rya.api.persist.RyaDAO;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.RyaNamespaceManager;
 import org.apache.rya.api.resolver.RyaTripleContext;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
 
 public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
     private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
@@ -131,13 +131,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
             flushEachUpdate = conf.flushEachUpdate();
 
-            TableOperations tableOperations = connector.tableOperations();
+            final TableOperations tableOperations = connector.tableOperations();
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs());
 
-            for (AccumuloIndexer index : secondaryIndexers) {
+            for (final AccumuloIndexer index : secondaryIndexers) {
                 index.setConf(conf);
             }
 
@@ -150,7 +150,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
             bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
 
-            for (AccumuloIndexer index : secondaryIndexers) {
+            for (final AccumuloIndexer index : secondaryIndexers) {
                index.setConnector(connector);
                index.setMultiTableBatchWriter(mt_bw);
                index.init();
@@ -161,7 +161,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             checkVersion();
 
             initialized = true;
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -169,7 +169,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     @Override
 	public String getVersion() throws RyaDAOException {
         String version = null;
-        CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
+        final CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
         if (versIter.hasNext()) {
             version = versIter.next().getObject().getData();
         }
@@ -179,43 +179,43 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-    public void add(RyaStatement statement) throws RyaDAOException {
+    public void add(final RyaStatement statement) throws RyaDAOException {
         commit(Iterators.singletonIterator(statement));
     }
 
     @Override
-    public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+    public void add(final Iterator<RyaStatement> iter) throws RyaDAOException {
         commit(iter);
     }
 
     @Override
-    public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
+    public void delete(final RyaStatement stmt, final AccumuloRdfConfiguration aconf) throws RyaDAOException {
         this.delete(Iterators.singletonIterator(stmt), aconf);
     }
 
     @Override
-    public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException {
+    public void delete(final Iterator<RyaStatement> statements, final AccumuloRdfConfiguration conf) throws RyaDAOException {
         try {
             while (statements.hasNext()) {
-                RyaStatement stmt = statements.next();
+                final RyaStatement stmt = statements.next();
                 //query first
-                CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
+                final CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
                 while (query.hasNext()) {
                     deleteSingleRyaStatement(query.next());
                 }
 
-                for (AccumuloIndexer index : secondaryIndexers) {
+                for (final AccumuloIndexer index : secondaryIndexers) {
                     index.deleteStatement(stmt);
                 }
             }
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
 
     @Override
-    public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException {
+    public void dropGraph(final AccumuloRdfConfiguration conf, final RyaURI... graphs) throws RyaDAOException {
         BatchDeleter bd_spo = null;
         BatchDeleter bd_po = null;
         BatchDeleter bd_osp = null;
@@ -229,7 +229,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             bd_po.setRanges(Collections.singleton(new Range()));
             bd_osp.setRanges(Collections.singleton(new Range()));
 
-            for (RyaURI graph : graphs){
+            for (final RyaURI graph : graphs){
                 bd_spo.fetchColumnFamily(new Text(graph.getData()));
                 bd_po.fetchColumnFamily(new Text(graph.getData()));
                 bd_osp.fetchColumnFamily(new Text(graph.getData()));
@@ -244,7 +244,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 //                index.dropGraph(graphs);
 //            }
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         } finally {
             if (bd_spo != null) {
@@ -260,34 +260,34 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
     }
 
-    protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException {
-        Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
+    protected void deleteSingleRyaStatement(final RyaStatement stmt) throws IOException, MutationsRejectedException {
+        final Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
         bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
         bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
         bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
     }
 
-    protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException {
+    protected void commit(final Iterator<RyaStatement> commitStatements) throws RyaDAOException {
         try {
             //TODO: Should have a lock here in case we are adding and committing at the same time
             while (commitStatements.hasNext()) {
-                RyaStatement stmt = commitStatements.next();
+                final RyaStatement stmt = commitStatements.next();
 
-                Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
-                Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
-                Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
-                Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+                final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
+                final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+                final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+                final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
                 bw_spo.addMutations(spo);
                 bw_po.addMutations(po);
                 bw_osp.addMutations(osp);
 
-                for (AccumuloIndexer index : secondaryIndexers) {
+                for (final AccumuloIndexer index : secondaryIndexers) {
                     index.storeStatement(stmt);
                 }
             }
 
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -303,57 +303,57 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             mt_bw.flush();
 
             mt_bw.close();
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.destroy();
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.warn("Failed to destroy indexer", e);
             }
         }
     }
 
     @Override
-    public void addNamespace(String pfx, String namespace) throws RyaDAOException {
+    public void addNamespace(final String pfx, final String namespace) throws RyaDAOException {
         try {
-            Mutation m = new Mutation(new Text(pfx));
+            final Mutation m = new Mutation(new Text(pfx));
             m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
             bw_ns.addMutation(m);
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
 
     @Override
-    public String getNamespace(String pfx) throws RyaDAOException {
+    public String getNamespace(final String pfx) throws RyaDAOException {
         try {
-            Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+            final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
                     ALL_AUTHORIZATIONS);
             scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
             scanner.setRange(new Range(new Text(pfx)));
-            Iterator<Map.Entry<Key, Value>> iterator = scanner
+            final Iterator<Map.Entry<Key, Value>> iterator = scanner
                     .iterator();
 
             if (iterator.hasNext()) {
                 return new String(iterator.next().getValue().get());
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
         return null;
     }
 
     @Override
-    public void removeNamespace(String pfx) throws RyaDAOException {
+    public void removeNamespace(final String pfx) throws RyaDAOException {
         try {
-            Mutation del = new Mutation(new Text(pfx));
+            final Mutation del = new Mutation(new Text(pfx));
             del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
             bw_ns.addMutation(del);
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -362,12 +362,12 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     @Override
     public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException {
         try {
-            Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+            final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
                     ALL_AUTHORIZATIONS);
             scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
-            Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+            final Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
             return new AccumuloNamespaceTableIterator(result);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -378,21 +378,21 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-    public void purge(RdfCloudTripleStoreConfiguration configuration) {
-        for (String tableName : getTables()) {
+    public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+        for (final String tableName : getTables()) {
             try {
                 purge(tableName, configuration.getAuths());
                 compact(tableName);
-            } catch (TableNotFoundException e) {
+            } catch (final TableNotFoundException e) {
                 logger.error(e.getMessage());
-            } catch (MutationsRejectedException e) {
+            } catch (final MutationsRejectedException e) {
                 logger.error(e.getMessage());
             }
         }
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.purge(configuration);
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.error("Failed to purge indexer", e);
             }
         }
@@ -400,24 +400,24 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
     @Override
     public void dropAndDestroy() throws RyaDAOException {
-        for (String tableName : getTables()) {
+        for (final String tableName : getTables()) {
             try {
                 drop(tableName);
-            } catch (AccumuloSecurityException e) {
+            } catch (final AccumuloSecurityException e) {
                 logger.error(e.getMessage());
                 throw new RyaDAOException(e);
-            } catch (AccumuloException e) {
+            } catch (final AccumuloException e) {
                 logger.error(e.getMessage());
                 throw new RyaDAOException(e);
-            } catch (TableNotFoundException e) {
+            } catch (final TableNotFoundException e) {
                 logger.warn(e.getMessage());
             }
         }
         destroy();
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.dropAndDestroy();
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.error("Failed to drop and destroy indexer", e);
             }
         }
@@ -427,7 +427,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return connector;
     }
 
-    public void setConnector(Connector connector) {
+    public void setConnector(final Connector connector) {
         this.connector = connector;
     }
 
@@ -435,7 +435,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return batchWriterConfig;
     }
 
-    public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+    public void setBatchWriterConfig(final BatchWriterConfig batchWriterConfig) {
         this.batchWriterConfig = batchWriterConfig;
     }
 
@@ -449,7 +449,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-	public void setConf(AccumuloRdfConfiguration conf) {
+	public void setConf(final AccumuloRdfConfiguration conf) {
         this.conf = conf;
     }
 
@@ -457,7 +457,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return ryaTableMutationsFactory;
     }
 
-    public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) {
+    public void setRyaTableMutationsFactory(final RyaTableMutationsFactory ryaTableMutationsFactory) {
         this.ryaTableMutationsFactory = ryaTableMutationsFactory;
     }
 
@@ -466,21 +466,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return queryEngine;
     }
 
-    public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+    public void setQueryEngine(final AccumuloRyaQueryEngine queryEngine) {
         this.queryEngine = queryEngine;
     }
 
+    @Override
     public void flush() throws RyaDAOException {
         try {
             mt_bw.flush();
-        } catch (MutationsRejectedException e) {
+        } catch (final MutationsRejectedException e) {
             throw new RyaDAOException(e);
         }
     }
 
     protected String[] getTables() {
         // core tables
-        List<String> tableNames = Lists.newArrayList(
+        final List<String> tableNames = Lists.newArrayList(
                 tableLayoutStrategy.getSpo(),
                 tableLayoutStrategy.getPo(),
                 tableLayoutStrategy.getOsp(),
@@ -488,17 +489,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
                 tableLayoutStrategy.getEval());
 
         // Additional Tables
-        for (AccumuloIndexer index : secondaryIndexers) {
+        for (final AccumuloIndexer index : secondaryIndexers) {
             tableNames.add(index.getTableName());
         }
 
         return tableNames.toArray(new String[]{});
     }
 
-    private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException {
+    private void purge(final String tableName, final String[] auths) throws TableNotFoundException, MutationsRejectedException {
         if (tableExists(tableName)) {
             logger.info("Purging accumulo table: " + tableName);
-            BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
+            final BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
             try {
                 batchDeleter.setRanges(Collections.singleton(new Range()));
                 batchDeleter.delete();
@@ -508,31 +509,31 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         }
     }
 
-    private void compact(String tableName) {
+    private void compact(final String tableName) {
         logger.info("Requesting major compaction for table " + tableName);
         try {
             connector.tableOperations().compact(tableName, null, null, true, false);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             logger.error(e.getMessage());
         }
     }
 
-    private boolean tableExists(String tableName) {
+    private boolean tableExists(final String tableName) {
         return getConnector().tableOperations().exists(tableName);
     }
 
-    private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+    private BatchDeleter createBatchDeleter(final String tableName, final Authorizations authorizations) throws TableNotFoundException {
         return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
     }
 
     private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
-        String version = getVersion();
+        final String version = getVersion();
         if (version == null) {
             //adding to core Rya tables but not Indexes
-            Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
-            Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
-            Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
-            Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+            final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+            final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+            final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+            final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
             bw_spo.addMutations(spo);
             bw_po.addMutations(po);
             bw_osp.addMutations(osp);
@@ -544,7 +545,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA);
     }
 
-    private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    private void drop(final String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
         logger.info("Dropping cloudbase table: " + tableName);
         connector.tableOperations().delete(tableName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index 067b682..418a155 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -40,6 +40,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword";
     public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
     public static final String USE_MOCK_MONGO = ".useMockInstance";
+    public static final String CONF_FLUSH_EACH_UPDATE = "rya.mongodb.dao.flusheachupdate";
+
     private MongoClient mongoClient;
 
     public MongoDBRdfConfiguration() {
@@ -99,6 +101,28 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     }
 
     /**
+     * @return {@code true} if each statement added to the batch writer should
+     * be flushed and written right away to the datastore. {@code false} if the
+     * statements should be queued and written to the datastore when the queue
+     * is full or after enough time has passed without a write.<p>
+     * Defaults to {@code true} if nothing is specified.
+     */
+    public boolean flushEachUpdate(){
+        return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+    }
+
+    /**
+     * Sets the {@link #CONF_FLUSH_EACH_UPDATE} property of the configuration.
+     * @param flush {@code true} if each statement added to the batch writer
+     * should be flushed and written right away to the datastore. {@code false}
+     * if the statements should be queued and written to the datastore when the
+     * queue is full or after enough time has passed without a write.
+     */
+    public void setFlush(final boolean flush){
+        setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+    }
+
+    /**
      * @return name of Mongo Collection containing Rya triples
      */
     public String getTriplesCollectionName() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
index daa8a67..a32651d 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -37,6 +37,11 @@ import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.RyaNamespaceManager;
 import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
 import org.apache.rya.api.persist.query.RyaQueryEngine;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
 import org.apache.rya.mongodb.dao.MongoDBNamespaceManager;
 import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
 import org.apache.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
@@ -47,7 +52,6 @@ import com.mongodb.DB;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
 import com.mongodb.DuplicateKeyException;
-import com.mongodb.InsertOptions;
 import com.mongodb.MongoClient;
 
 /**
@@ -56,6 +60,8 @@ import com.mongodb.MongoClient;
 public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class);
 
+    private boolean isInitialized = false;
+    private boolean flushEachUpdate = true;
     private MongoDBRdfConfiguration conf;
     private final MongoClient mongoClient;
     private DB db;
@@ -67,6 +73,8 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     private List<MongoSecondaryIndex> secondaryIndexers;
     private Authorizations auths;
 
+    private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
     /**
      * Creates a new instance of {@link MongoDBRyaDAO}.
      * @param conf the {@link MongoDBRdfConfiguration}.
@@ -87,6 +95,7 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         this.mongoClient = mongoClient;
         conf.setMongoClient(mongoClient);
         auths = conf.getAuthorizations();
+        flushEachUpdate = conf.flushEachUpdate();
         init();
     }
 
@@ -116,6 +125,9 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
 
     @Override
     public void init() throws RyaDAOException {
+        if (isInitialized) {
+            return;
+        }
         secondaryIndexers = conf.getAdditionalIndexers();
         for(final MongoSecondaryIndex index: secondaryIndexers) {
             index.setConf(conf);
@@ -131,15 +143,34 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         for(final MongoSecondaryIndex index: secondaryIndexers) {
             index.init();
         }
+
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(coll), mongoDbBatchWriterConfig);
+        try {
+            mongoDbBatchWriter.start();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error starting MongoDB batch writer", e);
+        }
+        isInitialized = true;
     }
 
     @Override
     public boolean isInitialized() throws RyaDAOException {
-        return true;
+        return isInitialized;
     }
 
     @Override
     public void destroy() throws RyaDAOException {
+        if (!isInitialized) {
+            return;
+        }
+        isInitialized = false;
+        flush();
+        try {
+            mongoDbBatchWriter.shutdown();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
+        }
         if (mongoClient != null) {
             mongoClient.close();
         }
@@ -153,7 +184,15 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         try {
             final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility());
             if (canAdd) {
-                coll.insert(storageStrategy.serialize(statement));
+                final DBObject obj = storageStrategy.serialize(statement);
+                try {
+                    mongoDbBatchWriter.addObjectToQueue(obj);
+                    if (flushEachUpdate) {
+                        flush();
+                    }
+                } catch (final MongoDbBatchWriterException e) {
+                    throw new RyaDAOException("Error adding statement", e);
+                }
                 for(final RyaSecondaryIndexer index: secondaryIndexers) {
                     index.storeStatement(statement);
                 }
@@ -190,7 +229,14 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
                 throw new RyaDAOException("User does not have the required authorizations to add statement");
             }
         }
-        coll.insert(dbInserts, new InsertOptions().continueOnError(true));
+        try {
+            mongoDbBatchWriter.addObjectsToQueue(dbInserts);
+            if (flushEachUpdate) {
+                flush();
+            }
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error adding statements", e);
+        }
     }
 
     @Override
@@ -263,4 +309,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     public void dropAndDestroy() throws RyaDAOException {
         db.dropDatabase(); // this is dangerous!
     }
+
+    @Override
+    public void flush() throws RyaDAOException {
+        try {
+            mongoDbBatchWriter.flush();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error flushing data.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
new file mode 100644
index 0000000..2f52b5c
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.mongodb.batch.collection.CollectionType;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Handles batch writing MongoDB statement objects to the repository. It takes
+ * in a configurable batch size and flush time. If the number of objects placed
+ * in the queue reaches the batch size then the objects are bulk written to the
+ * datastore. Or if the queue has not filled up after the batch time duration
+ * has passed then the statements are flushed out and written to the datastore.
+ * @param <T> the type of object that the batch writer's internal collection
+ * type uses.
+ */
+public class MongoDbBatchWriter<T> {
+    private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class);
+
+    private static final int CHECK_QUEUE_INTERVAL_MS = 10;
+
+    private final CollectionType<T> collectionType;
+    private final long batchFlushTimeMs;
+
+    private final ArrayBlockingQueue<T> statementInsertionQueue;
+    private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(0);
+    private ScheduledFuture<?> flushBatchFuture;
+    private final Runnable flushBatchTask;
+    private Thread queueFullCheckerThread;
+
+    private final AtomicBoolean isInit = new AtomicBoolean();
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriter}.
+     * @param collectionType the {@link CollectionType}. (not {@code null})
+     * @param mongoDbBatchWriterConfig the {@link MongoDbBatchWriterConfig}.
+     * (not {@code null})
+     */
+    public MongoDbBatchWriter(final CollectionType<T> collectionType, final MongoDbBatchWriterConfig mongoDbBatchWriterConfig) {
+        this.collectionType = checkNotNull(collectionType);
+        this.batchFlushTimeMs = checkNotNull(mongoDbBatchWriterConfig).getBatchFlushTimeMs();
+
+        statementInsertionQueue = new ArrayBlockingQueue<>(mongoDbBatchWriterConfig.getBatchSize());
+        flushBatchTask = new BatchFlusher();
+    }
+
+    /**
+     * Task used to flush statements if enough time has passed without an
+     * insertion while there are objects enqueued.
+     */
+    private class BatchFlusher implements Runnable {
+        @Override
+        public void run() {
+            try {
+                if (!statementInsertionQueue.isEmpty()) {
+                    log.trace("Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed.");
+                    flush();
+                }
+            } catch (final Exception e) {
+                log.error("Error flush out the statements", e);
+            }
+        }
+    }
+
+    private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder()
+        .setNameFormat("Queue Full Checker Thread - %d")
+        .setDaemon(true)
+        .build();
+
+    /**
+     * Checks the queue for statements to insert if the queue is full.
+     */
+    private class QueueFullChecker implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (isInit.get()) {
+                    // Check if the queue is full and if it is then insert the
+                    // statements. Otherwise reset the insertion timer.
+                    if (statementInsertionQueue.remainingCapacity() == 0) {
+                        log.trace("Statement queue is FULL -> going to empty it");
+                        try {
+                            flush();
+                        } catch (final MongoDbBatchWriterException e) {
+                            log.error("Error emptying queue", e);
+                        }
+                    }
+                    Thread.sleep(CHECK_QUEUE_INTERVAL_MS);
+                }
+            } catch (final InterruptedException e) {
+                log.error("Encountered an unexpected error while checking the batch queue.", e);
+            }
+        }
+    }
+
+    /**
+     * Starts the batch writer queue and processes.
+     */
+    public void start() throws MongoDbBatchWriterException {
+        if (!isInit.get()) {
+            if (flushBatchFuture == null) {
+                flushBatchFuture = startFlushTimer();
+            }
+            if (queueFullCheckerThread == null) {
+                queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker());
+            }
+            isInit.set(true);
+            queueFullCheckerThread.start();
+        }
+    }
+
+    /**
+     * Stops the batch writer processes.
+     */
+    public void shutdown() throws MongoDbBatchWriterException {
+        isInit.set(false);
+        if (flushBatchFuture != null) {
+            flushBatchFuture.cancel(true);
+            flushBatchFuture = null;
+        }
+        if (queueFullCheckerThread != null) {
+            if (queueFullCheckerThread.isAlive()) {
+                try {
+                    queueFullCheckerThread.join(2 * CHECK_QUEUE_INTERVAL_MS);
+                } catch (final InterruptedException e) {
+                    log.error("Error waiting for thread to finish", e);
+                }
+                queueFullCheckerThread = null;
+            }
+        }
+    }
+
+    /**
+     * Adds a MongoDB object to the queue which will not be written until one of
+     * the following occur:<br>
+     * <ul>
+     *  <li>The queue fills up</li>
+     *  <li>The flush time has been reached</li>
+     *  <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+     *  has been made</li>
+     * </ul>
+     * @param object the object to add to the queue.
+     * @throws IOException
+     */
+    public void addObjectToQueue(final T object) throws MongoDbBatchWriterException {
+        if (object != null) {
+            try {
+                // Place in the queue which will bulk write after the specified
+                // "batchSize" number of items have filled the queue or if more
+                // than "batchFlushTimeMs" milliseconds have passed since the
+                // last insertion.
+                resetFlushTimer();
+                statementInsertionQueue.put(object);
+            } catch (final Exception e) {
+                throw new MongoDbBatchWriterException("Error adding object to batch queue.", e);
+            }
+        }
+    }
+
+    /**
+     * Adds a list of MongoDB objects to the queue which will not be written
+     * until one of the following occur:<br>
+     * <ul>
+     *  <li>The queue fills up</li>
+     *  <li>The flush time has been reached</li>
+     *  <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+     *  has been made</li>
+     * </ul>
+     * @param objects a {@link List} of objects to add to the queue.
+     * @throws IOException
+     */
+    public void addObjectsToQueue(final List<T> objects) throws MongoDbBatchWriterException {
+        if (objects != null) {
+            for (final T object : objects) {
+                addObjectToQueue(object);
+            }
+        }
+    }
+
+    /**
+     * Flushes out statements that are in the queue.
+     */
+    public void flush() throws MongoDbBatchWriterException {
+        final List<T> batch = new ArrayList<>();
+        try {
+            statementInsertionQueue.drainTo(batch);
+            if (!batch.isEmpty()) {
+                collectionType.insertMany(batch);
+            }
+        } catch (final Exception e) {
+            throw new MongoDbBatchWriterException("Error flushing statements", e);
+        }
+    }
+
+    private void resetFlushTimer() throws MongoDbBatchWriterException {
+        flushBatchFuture.cancel(false);
+        flushBatchFuture = startFlushTimer();
+    }
+
+    private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException {
+        try {
+            return scheduledExecutor.schedule(flushBatchTask, batchFlushTimeMs, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            throw new MongoDbBatchWriterException("Error starting batch flusher", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
new file mode 100644
index 0000000..cec8b9a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configuration for the MongoDB Batch Writer.
+ */
+public class MongoDbBatchWriterConfig {
+    /**
+     * The default number of statements to batch write at a time.
+     */
+    public static final int DEFAULT_BATCH_SIZE = 50000;
+    private Integer batchSize = null;
+
+    /**
+     * The default time to wait in milliseconds to flush all statements out that
+     * are queued for insertion if the queue has not filled up to its capacity
+     * of {@link #DEFAULT_BATCH_SIZE} or the user configured buffer size.
+     */
+    public static final long DEFAULT_BATCH_FLUSH_TIME_MS = 100L;
+    private Long batchFlushTimeMs = null;
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig() {
+    }
+
+    /**
+     * Gets the configured number of statements to batch write at a time.
+     * @return the configured value or the default value.
+     */
+    public int getBatchSize() {
+        return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE;
+    }
+
+    /**
+     * Sets the number of statements to batch write at a time.
+     * @param batchSize the number of statements in each batch.
+     * @return the {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig setBatchSize(final int batchSize) {
+        Preconditions.checkArgument(batchSize > 0, "Batch size must be positive.");
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    /**
+     * Gets the configured time to wait in milliseconds to flush all statements
+     * out that are queued for insertion if the queue has not filled up to its
+     * capacity.
+     * @return the configured value or the default value.
+     */
+    public long getBatchFlushTimeMs() {
+        return batchFlushTimeMs != null ? batchFlushTimeMs : DEFAULT_BATCH_FLUSH_TIME_MS;
+    }
+
+    /**
+     * Sets the time to wait in milliseconds to flush all statements out that
+     * are queued for insertion if the queue has not filled up to its capacity.
+     * @param batchFlushTimeMs the time to wait before flushing all queued
+     * statements that have not been written.
+     * @return the {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig setBatchFlushTimeMs(final long batchFlushTimeMs) {
+        Preconditions.checkArgument(batchFlushTimeMs >= 0, "Batch flush time must be non-negative.");
+        this.batchFlushTimeMs = batchFlushTimeMs;
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
new file mode 100644
index 0000000..d4de156
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+/**
+ * An exception to be used when there is a problem running the MongoDB Batch
+ * Writer.
+ */
+public class MongoDbBatchWriterException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     */
+    public MongoDbBatchWriterException() {
+        super();
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param message the detail message.
+     */
+    public MongoDbBatchWriterException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param message the detail message.
+     * @param throwable the {@link Throwable} source.
+     */
+    public MongoDbBatchWriterException(final String message, final Throwable source) {
+        super(message, source);
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param source the {@link Throwable} source.
+     */
+    public MongoDbBatchWriterException(final Throwable source) {
+        super(source);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
new file mode 100644
index 0000000..99e8992
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Constants and utility methods related to batch writing statements in a MongoDB
+ * Rya repository.
+ */
+public final class MongoDbBatchWriterUtils {
+    /**
+     * Config tag used to specify the number of statements to batch write at a
+     * time.
+     */
+    public static final String BATCH_SIZE_TAG = "rya.mongodb.dao.batchwriter.size";
+
+    /**
+     * Config tag used to specify the time to wait in milliseconds to flush all
+     * statements out that are queued for insertion if the queue has not filled
+     * up to its capacity.
+     */
+    public static final String BATCH_FLUSH_TIME_MS_TAG = "rya.mongodb.dao.batchwriter.flushtime";
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private MongoDbBatchWriterUtils() {
+    }
+
+    /**
+     * The number of statements to batch write at a time.
+     * @param conf the {@link Configuration} to check.
+     * @return the configured value or the default value.
+     */
+    public static int getConfigBatchSize(final Configuration conf) {
+        return conf.getInt(BATCH_SIZE_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_SIZE);
+    }
+
+    /**
+     * The time to wait in milliseconds to flush all statements out that are
+     * queued for insertion if the queue has not filled up to its capacity.
+     * @param conf the {@link Configuration} to check.
+     * @return the configured value or the default value.
+     */
+    public static long getConfigBatchFlushTimeMs(final Configuration conf) {
+        return conf.getLong(BATCH_FLUSH_TIME_MS_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_FLUSH_TIME_MS);
+    }
+
+    /**
+     * Reads the specified configed to create and initialize a
+     * {@link MongoDbBatchWriterConfig}. If no values are found then the default
+     * values are used.
+     * @param conf the {@link Configuration} to check.
+     * @return  the {@link MongoDbBatchWriterConfig} populated with configured
+     * values for the specified {@code conf}.
+     */
+    public static MongoDbBatchWriterConfig getMongoDbBatchWriterConfig(final Configuration conf) {
+        final int batchSize = getConfigBatchSize(conf);
+        final long batchFlushTimeMs = getConfigBatchFlushTimeMs(conf);
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = new MongoDbBatchWriterConfig();
+        mongoDbBatchWriterConfig.setBatchSize(batchSize);
+        mongoDbBatchWriterConfig.setBatchFlushTimeMs(batchFlushTimeMs);
+        return mongoDbBatchWriterConfig;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
new file mode 100644
index 0000000..9e6d6fb
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import java.util.List;
+
+/**
+ * Wrapper for interacting with the new and legacy MongoDB collection types
+ * ({@link com.mongodb.client.MongoCollection} and
+ * {@link com.mongodb.DBCollection} respectively)
+ * in order to handle inserts from both types and the object types they
+ * utilize.
+ * @param <T> the type of object the collection type inserts.
+ */
+public interface CollectionType<T> {
+    /**
+     * Insert one item.
+     * @param item the item to insert.
+     */
+    public void insertOne(final T item);
+
+    /**
+     * Insert a list of items.
+     * @param items the {@link List} of items.
+     */
+    public void insertMany(final List<T> items);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
new file mode 100644
index 0000000..ea00693
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.InsertOptions;
+import com.mongodb.WriteConcern;
+
+/**
+ * Provides access to the {@link DBCollection} type.
+ */
+public class DbCollectionType implements CollectionType<DBObject> {
+    private final DBCollection collection;
+
+    /**
+     * Creates a new instance of {@link DbCollectionType}.
+     * @param collection the {@link DBCollection}. (not {@code null})
+     */
+    public DbCollectionType(final DBCollection collection) {
+        this.collection = checkNotNull(collection);
+    }
+
+    @Override
+    public void insertOne(final DBObject item) {
+        collection.insert(item, WriteConcern.ACKNOWLEDGED);
+    }
+
+    @Override
+    public void insertMany(final List<DBObject> items) {
+        collection.insert(items, new InsertOptions().continueOnError(true));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
new file mode 100644
index 0000000..8fb796a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import org.bson.Document;
+
+import com.mongodb.client.MongoCollection;
+
+/**
+ * Provides access to the {@link MongoCollection} type.
+ */
+public class MongoCollectionType implements CollectionType<Document> {
+    private final MongoCollection<Document> collection;
+
+    /**
+     * Creates a new instance of {@link MongoCollectionType}.
+     * @param collection the {@link MongoCollection}. (not {@code null})
+     */
+    public MongoCollectionType(final MongoCollection<Document> collection) {
+        this.collection = checkNotNull(collection);
+    }
+
+    @Override
+    public void insertOne(final Document item) {
+        collection.insertOne(item);
+    }
+
+    @Override
+    public void insertMany(final List<Document> items) {
+        collection.insertMany(items);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index f5372d1..69ca274 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -33,6 +33,11 @@ import org.apache.rya.mongodb.MongoConnectorFactory;
 import org.apache.rya.mongodb.MongoDBRdfConfiguration;
 import org.apache.rya.mongodb.MongoDBRyaDAO;
 import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
 import org.openrdf.model.Literal;
 import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
@@ -46,7 +51,6 @@ import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.QueryBuilder;
 import com.mongodb.ServerAddress;
-import com.mongodb.WriteConcern;
 
 import info.aduna.iteration.CloseableIteration;
 
@@ -58,6 +62,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
 
     private boolean isInit = false;
+    private boolean flushEachUpdate = true;
     protected Configuration conf;
     protected MongoDBRyaDAO dao;
     protected MongoClient mongoClient;
@@ -68,15 +73,28 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
 
     protected T storageStrategy;
 
+    private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
     protected void initCore() {
         dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME);
         db = this.mongoClient.getDB(dbName);
-        collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
+        final String collectionName = conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName();
+        collection = db.getCollection(collectionName);
+
+        flushEachUpdate = ((MongoDBRdfConfiguration)conf).flushEachUpdate();
+
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(collection), mongoDbBatchWriterConfig);
+        try {
+            mongoDbBatchWriter.start();
+        } catch (final MongoDbBatchWriterException e) {
+            LOG.error("Error start MongoDB batch writer", e);
+        }
     }
 
     @Override
     public void setClient(final MongoClient client){
-    	this.mongoClient = client;
+        this.mongoClient = client;
     }
 
     @VisibleForTesting
@@ -96,8 +114,8 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     public void setConf(final Configuration conf) {
         this.conf = conf;
         if (!isInit){
-        	setClient(MongoConnectorFactory.getMongoClient(conf));
-        	init();
+            setClient(MongoConnectorFactory.getMongoClient(conf));
+            init();
         }
     }
 
@@ -108,6 +126,11 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
 
     @Override
     public void flush() throws IOException {
+        try {
+            mongoDbBatchWriter.flush();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new IOException("Error flushing batch writer", e);
+        }
     }
 
     @Override
@@ -135,24 +158,43 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     public void storeStatements(final Collection<RyaStatement> ryaStatements)
             throws IOException {
         for (final RyaStatement ryaStatement : ryaStatements){
-            storeStatement(ryaStatement);
+            storeStatement(ryaStatement, false);
+        }
+        if (flushEachUpdate) {
+            flush();
         }
     }
 
     @Override
     public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+        storeStatement(ryaStatement, flushEachUpdate);
+    }
+
+    private void storeStatement(final RyaStatement ryaStatement, final boolean flush) throws IOException {
+        final DBObject obj = prepareStatementForStorage(ryaStatement);
+        try {
+            mongoDbBatchWriter.addObjectToQueue(obj);
+            if (flush) {
+                flush();
+            }
+        } catch (final MongoDbBatchWriterException e) {
+            throw new IOException("Error storing statement", e);
+        }
+    }
+
+    private DBObject prepareStatementForStorage(final RyaStatement ryaStatement) {
         try {
             final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
             final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate());
             if (isValidPredicate && (statement.getObject() instanceof Literal)) {
                 final DBObject obj = storageStrategy.serialize(ryaStatement);
-                if (obj != null) {
-                    collection.insert(obj, WriteConcern.ACKNOWLEDGED);
-                }
+                return obj;
             }
         } catch (final IllegalArgumentException e) {
             LOG.error("Unable to parse the statement: " + ryaStatement.toString(), e);
         }
+
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
index 4ddb2a5..6fac386 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
@@ -21,9 +21,7 @@ package org.apache.rya.indexing.mongo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
@@ -36,12 +34,13 @@ import org.apache.rya.indexing.entity.storage.EntityStorage;
 import org.apache.rya.indexing.entity.storage.TypeStorage;
 import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
 import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
 import org.apache.rya.mongodb.MongoDBRdfConfiguration;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.bson.Document;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
@@ -70,7 +69,7 @@ public class MongoEntityIndexIT {
     private MongoClient mongoClient;
 
     @Before
-    public void setUp() throws Exception{
+    public void setUp() throws Exception {
         mongoClient = MockMongoFactory.with(Version.Main.PRODUCTION).newMongoClient();
         conf = new MongoDBRdfConfiguration();
         conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test");
@@ -91,6 +90,19 @@ public class MongoEntityIndexIT {
         indexer.init();
     }
 
+    @After
+    public void tearDown() throws Exception {
+        if (mongoClient != null) {
+            MongoConnectorFactory.closeMongoClient();
+        }
+        if (conn != null) {
+            conn.clear();
+        }
+        if (indexer != null) {
+            indexer.close();
+        }
+    }
+
     @Test
     public void ensureInEntityStore_Test() throws Exception {
         setupTypes();
@@ -202,8 +214,6 @@ public class MongoEntityIndexIT {
     }
 
     private void addStatements() throws Exception {
-        final List<Statement> stmnts = new ArrayList<>();
-
         //alice
         URI subject = VF.createURI("urn:alice");
         URI predicate = VF.createURI("urn:name");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
index b533d42..4b66b5b 100644
--- a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
@@ -31,6 +31,8 @@ import org.apache.rya.indexing.TemporalInstantRfc3339;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
 import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.openrdf.model.Resource;
@@ -81,6 +83,16 @@ public class MongoIndexerDeleteIT {
         conn.begin();
     }
 
+    @After
+    public void after() throws Exception {
+        if (client != null) {
+            MongoConnectorFactory.closeMongoClient();
+        }
+        if (conn != null) {
+            conn.clear();
+        }
+    }
+
     @Test
     public void deleteTest() throws Exception {
         populateRya();
@@ -150,14 +162,10 @@ public class MongoIndexerDeleteIT {
         uuid = "urn:people";
         conn.add(VF.createURI(uuid), RDF.TYPE, person);
         conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Alice Palace Hose", VF.createURI("http://www.w3.org/2001/XMLSchema#string")));
-
-        uuid = "urn:people";
-        conn.add(VF.createURI(uuid), RDF.TYPE, person);
         conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Bob Snob Hose", "en"));
 
         // temporal
         final TemporalInstant instant = new TemporalInstantRfc3339(1, 2, 3, 4, 5, 6);
-        final URI time = VF.createURI("Property:atTime");
         conn.add(VF.createURI("foo:time"), VF.createURI("Property:atTime"), VF.createLiteral(instant.toString()));
     }