You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/08/04 02:10:28 UTC

[1/2] incubator-rya git commit: RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181

Repository: incubator-rya
Updated Branches:
  refs/heads/master fa2aad55f -> 8def4caca


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
----------------------------------------------------------------------
diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
index 44b0bd4..6784c90 100644
--- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
+++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
@@ -8,9 +8,9 @@ package org.apache.rya.rdftriplestore;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,15 +23,16 @@ package org.apache.rya.rdftriplestore;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import info.aduna.iteration.CloseableIteration;
 
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.RdfCloudTripleStoreConstants;
 import org.apache.rya.api.domain.RyaStatement;
@@ -62,8 +63,6 @@ import org.apache.rya.rdftriplestore.namespace.NamespaceManager;
 import org.apache.rya.rdftriplestore.provenance.ProvenanceCollectionException;
 import org.apache.rya.rdftriplestore.provenance.ProvenanceCollector;
 import org.apache.rya.rdftriplestore.utils.DefaultStatistics;
-
-import org.apache.hadoop.conf.Configurable;
 import org.openrdf.model.Namespace;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
@@ -99,21 +98,23 @@ import org.openrdf.query.impl.EmptyBindingSet;
 import org.openrdf.sail.SailException;
 import org.openrdf.sail.helpers.SailConnectionBase;
 
+import info.aduna.iteration.CloseableIteration;
+
 public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
-    private RdfCloudTripleStore store;
+    private final RdfCloudTripleStore store;
 
     private RdfEvalStatsDAO rdfEvalStatsDAO;
     private SelectivityEvalDAO selectEvalDAO;
     private RyaDAO ryaDAO;
     private InferenceEngine inferenceEngine;
     private NamespaceManager namespaceManager;
-    private RdfCloudTripleStoreConfiguration conf;
-    
+    private final RdfCloudTripleStoreConfiguration conf;
+
 
 	private ProvenanceCollector provenanceCollector;
 
-    public RdfCloudTripleStoreConnection(RdfCloudTripleStore sailBase, RdfCloudTripleStoreConfiguration conf, ValueFactory vf)
+    public RdfCloudTripleStoreConnection(final RdfCloudTripleStore sailBase, final RdfCloudTripleStoreConfiguration conf, final ValueFactory vf)
             throws SailException {
         super(sailBase);
         this.store = sailBase;
@@ -138,54 +139,56 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
             this.namespaceManager = store.getNamespaceManager();
             this.provenanceCollector = store.getProvenanceCollector();
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new SailException(e);
         }
     }
 
     @Override
-    protected void addStatementInternal(Resource subject, URI predicate,
-                                        Value object, Resource... contexts) throws SailException {
+    protected void addStatementInternal(final Resource subject, final URI predicate,
+                                        final Value object, final Resource... contexts) throws SailException {
         try {
-            String cv_s = conf.getCv();
-            byte[] cv = cv_s == null ? null : cv_s.getBytes();
+            final String cv_s = conf.getCv();
+            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
+            final List<RyaStatement> ryaStatements = new ArrayList<>();
             if (contexts != null && contexts.length > 0) {
-                for (Resource context : contexts) {
-                    RyaStatement statement = new RyaStatement(
+                for (final Resource context : contexts) {
+                    final RyaStatement statement = new RyaStatement(
                             RdfToRyaConversions.convertResource(subject),
                             RdfToRyaConversions.convertURI(predicate),
                             RdfToRyaConversions.convertValue(object),
                             RdfToRyaConversions.convertResource(context),
                             null, new StatementMetadata(), cv);
 
-                    ryaDAO.add(statement);
+                    ryaStatements.add(statement);
                 }
             } else {
-                RyaStatement statement = new RyaStatement(
+                final RyaStatement statement = new RyaStatement(
                         RdfToRyaConversions.convertResource(subject),
                         RdfToRyaConversions.convertURI(predicate),
                         RdfToRyaConversions.convertValue(object),
                         null, null, new StatementMetadata(), cv);
 
-                ryaDAO.add(statement);
+                ryaStatements.add(statement);
             }
-        } catch (RyaDAOException e) {
+            ryaDAO.add(ryaStatements.iterator());
+        } catch (final RyaDAOException e) {
             throw new SailException(e);
         }
     }
 
-    
-    
-    
+
+
+
     @Override
-    protected void clearInternal(Resource... aresource) throws SailException {
+    protected void clearInternal(final Resource... aresource) throws SailException {
         try {
-            RyaURI[] graphs = new RyaURI[aresource.length];
+            final RyaURI[] graphs = new RyaURI[aresource.length];
             for (int i = 0 ; i < graphs.length ; i++){
                 graphs[i] = RdfToRyaConversions.convertResource(aresource[i]);
             }
             ryaDAO.dropGraph(conf, graphs);
-        } catch (RyaDAOException e) {
+        } catch (final RyaDAOException e) {
             throw new SailException(e);
         }
     }
@@ -208,63 +211,63 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
     @Override
     protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(
-            TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
-            boolean flag) throws SailException {
+            TupleExpr tupleExpr, final Dataset dataset, BindingSet bindings,
+            final boolean flag) throws SailException {
         verifyIsOpen();
         logger.trace("Incoming query model:\n{}", tupleExpr.toString());
         if (provenanceCollector != null){
         	try {
 				provenanceCollector.recordQuery(tupleExpr.toString());
-			} catch (ProvenanceCollectionException e) {
+			} catch (final ProvenanceCollectionException e) {
 				// TODO silent fail
 				e.printStackTrace();
 			}
         }
         tupleExpr = tupleExpr.clone();
 
-        RdfCloudTripleStoreConfiguration queryConf = store.getConf().clone();
+        final RdfCloudTripleStoreConfiguration queryConf = store.getConf().clone();
         if (bindings != null) {
-            Binding dispPlan = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG);
+            final Binding dispPlan = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG);
             if (dispPlan != null) {
                 queryConf.setDisplayQueryPlan(Boolean.parseBoolean(dispPlan.getValue().stringValue()));
             }
 
-            Binding authBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
+            final Binding authBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
             if (authBinding != null) {
                 queryConf.setAuths(authBinding.getValue().stringValue().split(","));
             }
 
-            Binding ttlBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_TTL);
+            final Binding ttlBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_TTL);
             if (ttlBinding != null) {
                 queryConf.setTtl(Long.valueOf(ttlBinding.getValue().stringValue()));
             }
 
-            Binding startTimeBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_STARTTIME);
+            final Binding startTimeBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_STARTTIME);
             if (startTimeBinding != null) {
                 queryConf.setStartTime(Long.valueOf(startTimeBinding.getValue().stringValue()));
             }
 
-            Binding performantBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT);
+            final Binding performantBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT);
             if (performantBinding != null) {
                 queryConf.setBoolean(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT, Boolean.parseBoolean(performantBinding.getValue().stringValue()));
             }
 
-            Binding inferBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_INFER);
+            final Binding inferBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_INFER);
             if (inferBinding != null) {
                 queryConf.setInfer(Boolean.parseBoolean(inferBinding.getValue().stringValue()));
             }
 
-            Binding useStatsBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_USE_STATS);
+            final Binding useStatsBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_USE_STATS);
             if (useStatsBinding != null) {
                 queryConf.setUseStats(Boolean.parseBoolean(useStatsBinding.getValue().stringValue()));
             }
 
-            Binding offsetBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_OFFSET);
+            final Binding offsetBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_OFFSET);
             if (offsetBinding != null) {
                 queryConf.setOffset(Long.parseLong(offsetBinding.getValue().stringValue()));
             }
 
-            Binding limitBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_LIMIT);
+            final Binding limitBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_LIMIT);
             if (limitBinding != null) {
                 queryConf.setLimit(Long.parseLong(limitBinding.getValue().stringValue()));
             }
@@ -277,15 +280,15 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
         }
 
         try {
-            List<Class<QueryOptimizer>> optimizers = queryConf.getOptimizers();
-            Class<QueryOptimizer> pcjOptimizer = queryConf.getPcjOptimizer();
-            
+            final List<Class<QueryOptimizer>> optimizers = queryConf.getOptimizers();
+            final Class<QueryOptimizer> pcjOptimizer = queryConf.getPcjOptimizer();
+
             if(pcjOptimizer != null) {
                 QueryOptimizer opt = null;
                 try {
-                    Constructor<QueryOptimizer> construct = pcjOptimizer.getDeclaredConstructor(new Class[] {});
+                    final Constructor<QueryOptimizer> construct = pcjOptimizer.getDeclaredConstructor(new Class[] {});
                     opt = construct.newInstance();
-                } catch (Exception e) {
+                } catch (final Exception e) {
                 }
                 if (opt == null) {
                     throw new NoSuchMethodException("Could not find valid constructor for " + pcjOptimizer.getName());
@@ -295,10 +298,10 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                 }
                 opt.optimize(tupleExpr, dataset, bindings);
             }
-            
+
             final ParallelEvaluationStrategyImpl strategy = new ParallelEvaluationStrategyImpl(
                     new StoreTripleSource(queryConf, ryaDAO), inferenceEngine, dataset, queryConf);
-            
+
                 (new BindingAssigner()).optimize(tupleExpr, dataset, bindings);
                 (new ConstantOptimizer(strategy)).optimize(tupleExpr, dataset,
                         bindings);
@@ -310,22 +313,22 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                 (new SameTermFilterOptimizer()).optimize(tupleExpr, dataset,
                         bindings);
                 (new QueryModelNormalizer()).optimize(tupleExpr, dataset, bindings);
-    
+
                 (new IterativeEvaluationOptimizer()).optimize(tupleExpr, dataset,
                         bindings);
 
             if (!optimizers.isEmpty()) {
-                for (Class<QueryOptimizer> optclz : optimizers) {
+                for (final Class<QueryOptimizer> optclz : optimizers) {
                     QueryOptimizer result = null;
                     try {
-                        Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(new Class[] {});
+                        final Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(new Class[] {});
                         result = meth.newInstance();
-                    } catch (Exception e) {
+                    } catch (final Exception e) {
                     }
                     try {
-                        Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(EvaluationStrategy.class);
+                        final Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(EvaluationStrategy.class);
                         result = meth.newInstance(strategy);
-                    } catch (Exception e) {
+                    } catch (final Exception e) {
                     }
                     if (result == null) {
                         throw new NoSuchMethodException("Could not find valid constructor for " + optclz.getName());
@@ -339,7 +342,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
             (new FilterOptimizer()).optimize(tupleExpr, dataset, bindings);
             (new OrderLimitOptimizer()).optimize(tupleExpr, dataset, bindings);
-            
+
             logger.trace("Optimized query model:\n{}", tupleExpr.toString());
 
             if (queryConf.isInfer()
@@ -354,7 +357,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                     tupleExpr.visit(new SubPropertyOfVisitor(queryConf, inferenceEngine));
                     tupleExpr.visit(new SubClassOfVisitor(queryConf, inferenceEngine));
                     tupleExpr.visit(new SameAsVisitor(queryConf, inferenceEngine));
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     e.printStackTrace();
                 }
             }
@@ -363,7 +366,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 //                tupleExpr.visit(new FilterTimeIndexVisitor(queryConf));
 //                tupleExpr.visit(new PartitionFilterTimeIndexVisitor(queryConf));
             }
-            FilterRangeVisitor rangeVisitor = new FilterRangeVisitor(queryConf);
+            final FilterRangeVisitor rangeVisitor = new FilterRangeVisitor(queryConf);
             tupleExpr.visit(rangeVisitor);
             tupleExpr.visit(rangeVisitor); //this has to be done twice to get replace the statementpatterns with the right ranges
             EvaluationStatistics stats = null;
@@ -382,7 +385,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
                 if (stats instanceof RdfCloudTripleStoreSelectivityEvaluationStatistics) {
 
-                    (new QueryJoinSelectOptimizer((RdfCloudTripleStoreSelectivityEvaluationStatistics) stats,
+                    (new QueryJoinSelectOptimizer(stats,
                             selectEvalDAO)).optimize(tupleExpr, dataset, bindings);
                 } else {
 
@@ -393,23 +396,23 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
             final CloseableIteration<BindingSet, QueryEvaluationException> iter = strategy
                     .evaluate(tupleExpr, EmptyBindingSet.getInstance());
-            CloseableIteration<BindingSet, QueryEvaluationException> iterWrap = new CloseableIteration<BindingSet, QueryEvaluationException>() {
-                
+            final CloseableIteration<BindingSet, QueryEvaluationException> iterWrap = new CloseableIteration<BindingSet, QueryEvaluationException>() {
+
                 @Override
                 public void remove() throws QueryEvaluationException {
                   iter.remove();
                 }
-                
+
                 @Override
                 public BindingSet next() throws QueryEvaluationException {
                     return iter.next();
                 }
-                
+
                 @Override
                 public boolean hasNext() throws QueryEvaluationException {
                     return iter.hasNext();
                 }
-                
+
                 @Override
                 public void close() throws QueryEvaluationException {
                     iter.close();
@@ -417,9 +420,9 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                 }
             };
             return iterWrap;
-        } catch (QueryEvaluationException e) {
+        } catch (final QueryEvaluationException e) {
             throw new SailException(e);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new SailException(e);
         }
     }
@@ -434,7 +437,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
     }
 
     @Override
-    protected String getNamespaceInternal(String s) throws SailException {
+    protected String getNamespaceInternal(final String s) throws SailException {
         return namespaceManager.getNamespace(s);
     }
 
@@ -446,8 +449,8 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
     @Override
     protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(
-            Resource subject, URI predicate, Value object, boolean flag,
-            Resource... contexts) throws SailException {
+            final Resource subject, final URI predicate, final Value object, final boolean flag,
+            final Resource... contexts) throws SailException {
 //        try {
         //have to do this to get the inferred values
         //TODO: Will this method reduce performance?
@@ -470,7 +473,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
             isClosed = true;
                 try {
                     evaluate.close();
-                } catch (QueryEvaluationException e) {
+                } catch (final QueryEvaluationException e) {
                     throw new SailException(e);
                 }
             }
@@ -479,7 +482,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
             public boolean hasNext() throws SailException {
                 try {
                     return evaluate.hasNext();
-                } catch (QueryEvaluationException e) {
+                } catch (final QueryEvaluationException e) {
                     throw new SailException(e);
                 }
             }
@@ -491,11 +494,11 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                 }
 
                 try {
-                    BindingSet next = evaluate.next();
-                    Resource bs_subj = (Resource) ((subjVar.hasValue()) ? subjVar.getValue() : next.getBinding(subjVar.getName()).getValue());
-                    URI bs_pred = (URI) ((predVar.hasValue()) ? predVar.getValue() : next.getBinding(predVar.getName()).getValue());
-                    Value bs_obj = (objVar.hasValue()) ? objVar.getValue() : (Value) next.getBinding(objVar.getName()).getValue();
-                    Binding b_cntxt = next.getBinding(cntxtVar.getName());
+                    final BindingSet next = evaluate.next();
+                    final Resource bs_subj = (Resource) ((subjVar.hasValue()) ? subjVar.getValue() : next.getBinding(subjVar.getName()).getValue());
+                    final URI bs_pred = (URI) ((predVar.hasValue()) ? predVar.getValue() : next.getBinding(predVar.getName()).getValue());
+                    final Value bs_obj = (objVar.hasValue()) ? objVar.getValue() : (Value) next.getBinding(objVar.getName()).getValue();
+                    final Binding b_cntxt = next.getBinding(cntxtVar.getName());
 
                     //convert BindingSet to Statement
                     if (b_cntxt != null) {
@@ -503,7 +506,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                     } else {
                         return new StatementImpl(bs_subj, bs_pred, bs_obj);
                     }
-                } catch (QueryEvaluationException e) {
+                } catch (final QueryEvaluationException e) {
                     throw new SailException(e);
                 }
             }
@@ -512,7 +515,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
             public void remove() throws SailException {
                 try {
                     evaluate.remove();
-                } catch (QueryEvaluationException e) {
+                } catch (final QueryEvaluationException e) {
                     throw new SailException(e);
                 }
             }
@@ -522,7 +525,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 //        }
     }
 
-    protected Var decorateValue(Value val, String name) {
+    protected Var decorateValue(final Value val, final String name) {
         if (val == null) {
             return new Var(name);
         } else {
@@ -531,24 +534,24 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
     }
 
     @Override
-    protected void removeNamespaceInternal(String s) throws SailException {
+    protected void removeNamespaceInternal(final String s) throws SailException {
         namespaceManager.removeNamespace(s);
     }
 
     @Override
-    protected void removeStatementsInternal(Resource subject, URI predicate,
-                                            Value object, Resource... contexts) throws SailException {
+    protected void removeStatementsInternal(final Resource subject, final URI predicate,
+                                            final Value object, final Resource... contexts) throws SailException {
         if (!(subject instanceof URI)) {
             throw new SailException("Subject[" + subject + "] must be URI");
         }
 
         try {
             if (contexts != null && contexts.length > 0) {
-                for (Resource context : contexts) {
+                for (final Resource context : contexts) {
                     if (!(context instanceof URI)) {
                         throw new SailException("Context[" + context + "] must be URI");
                     }
-                    RyaStatement statement = new RyaStatement(
+                    final RyaStatement statement = new RyaStatement(
                             RdfToRyaConversions.convertResource(subject),
                             RdfToRyaConversions.convertURI(predicate),
                             RdfToRyaConversions.convertValue(object),
@@ -557,7 +560,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
                     ryaDAO.delete(statement, conf);
                 }
             } else {
-                RyaStatement statement = new RyaStatement(
+                final RyaStatement statement = new RyaStatement(
                         RdfToRyaConversions.convertResource(subject),
                         RdfToRyaConversions.convertURI(predicate),
                         RdfToRyaConversions.convertValue(object),
@@ -565,7 +568,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
                 ryaDAO.delete(statement, conf);
             }
-        } catch (RyaDAOException e) {
+        } catch (final RyaDAOException e) {
             throw new SailException(e);
         }
     }
@@ -576,13 +579,13 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
     }
 
     @Override
-    protected void setNamespaceInternal(String s, String s1)
+    protected void setNamespaceInternal(final String s, final String s1)
             throws SailException {
         namespaceManager.addNamespace(s, s1);
     }
 
     @Override
-    protected long sizeInternal(Resource... contexts) throws SailException {
+    protected long sizeInternal(final Resource... contexts) throws SailException {
         logger.error("Cannot determine size as of yet");
 
         return 0;
@@ -595,32 +598,34 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
 
     public static class StoreTripleSource implements TripleSource {
 
-        private RdfCloudTripleStoreConfiguration conf;
-        private RyaDAO<?> ryaDAO;
+        private final RdfCloudTripleStoreConfiguration conf;
+        private final RyaDAO<?> ryaDAO;
 
-        public StoreTripleSource(RdfCloudTripleStoreConfiguration conf, RyaDAO<?> ryaDAO) {
+        public StoreTripleSource(final RdfCloudTripleStoreConfiguration conf, final RyaDAO<?> ryaDAO) {
             this.conf = conf;
             this.ryaDAO = ryaDAO;
         }
 
-        public CloseableIteration<Statement, QueryEvaluationException> getStatements(
-                Resource subject, URI predicate, Value object,
-                Resource... contexts) throws QueryEvaluationException {
+        @Override
+		public CloseableIteration<Statement, QueryEvaluationException> getStatements(
+                final Resource subject, final URI predicate, final Value object,
+                final Resource... contexts) throws QueryEvaluationException {
             return RyaDAOHelper.query(ryaDAO, subject, predicate, object, conf, contexts);
         }
 
         public CloseableIteration<? extends Entry<Statement, BindingSet>, QueryEvaluationException> getStatements(
-                Collection<Map.Entry<Statement, BindingSet>> statements,
-                Resource... contexts) throws QueryEvaluationException {
+                final Collection<Map.Entry<Statement, BindingSet>> statements,
+                final Resource... contexts) throws QueryEvaluationException {
 
             return RyaDAOHelper.query(ryaDAO, statements, conf);
         }
 
-        public ValueFactory getValueFactory() {
+        @Override
+		public ValueFactory getValueFactory() {
             return RdfCloudTripleStoreConstants.VALUE_FACTORY;
         }
     }
-    
+
     public InferenceEngine getInferenceEngine() {
         return inferenceEngine;
     }


[2/2] incubator-rya git commit: RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181

Posted by mi...@apache.org.
RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181

...through the Sail Layer and Rya DAO by queueing up multiple inserts at a time so can be written as a single batch.  If no statements in the batch have been written after a set time limit then they are flushed out into the datastore.  The size of the batch and the time limit are configurable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8def4cac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8def4cac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8def4cac

Branch: refs/heads/master
Commit: 8def4cacac7d0bb9ca3cc675c53d849261aa8029
Parents: fa2aad5
Author: eric.white <Er...@parsons.com>
Authored: Wed Jul 19 09:08:32 2017 -0400
Committer: Aaron Mihalik <aa...@gmail.com>
Committed: Thu Aug 3 15:45:05 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/rya/api/persist/RyaDAO.java |   7 +
 .../org/apache/rya/accumulo/AccumuloRyaDAO.java | 161 ++++++-------
 .../rya/mongodb/MongoDBRdfConfiguration.java    |  24 ++
 .../org/apache/rya/mongodb/MongoDBRyaDAO.java   |  63 ++++-
 .../rya/mongodb/batch/MongoDbBatchWriter.java   | 238 +++++++++++++++++++
 .../mongodb/batch/MongoDbBatchWriterConfig.java |  88 +++++++
 .../batch/MongoDbBatchWriterException.java      |  59 +++++
 .../mongodb/batch/MongoDbBatchWriterUtils.java  |  82 +++++++
 .../batch/collection/CollectionType.java        |  43 ++++
 .../batch/collection/DbCollectionType.java      |  53 +++++
 .../batch/collection/MongoCollectionType.java   |  52 ++++
 .../indexing/mongodb/AbstractMongoIndexer.java  |  60 ++++-
 .../rya/indexing/mongo/MongoEntityIndexIT.java  |  22 +-
 .../indexing/mongo/MongoIndexerDeleteIT.java    |  16 +-
 .../RdfCloudTripleStoreConnection.java          | 195 +++++++--------
 15 files changed, 965 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
index 57aae1b..d83a5e9 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
@@ -123,4 +123,11 @@ public interface RyaDAO<C extends RdfCloudTripleStoreConfiguration> extends RyaC
     public void purge(RdfCloudTripleStoreConfiguration configuration);
 
     public void dropAndDestroy() throws RyaDAOException;
+
+    /**
+     * Flushes any RyaStatements queued for insertion and writes them to the
+     * datastore.
+     * @throws RyaDAOException
+     */
+    public void flush() throws RyaDAOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
index bd7d2b3..f1f7c03 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -59,12 +59,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.openrdf.model.Namespace;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import info.aduna.iteration.CloseableIteration;
 import org.apache.rya.accumulo.experimental.AccumuloIndexer;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
@@ -76,6 +70,12 @@ import org.apache.rya.api.persist.RyaDAO;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.RyaNamespaceManager;
 import org.apache.rya.api.resolver.RyaTripleContext;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
 
 public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
     private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
@@ -131,13 +131,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
             flushEachUpdate = conf.flushEachUpdate();
 
-            TableOperations tableOperations = connector.tableOperations();
+            final TableOperations tableOperations = connector.tableOperations();
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs());
 
-            for (AccumuloIndexer index : secondaryIndexers) {
+            for (final AccumuloIndexer index : secondaryIndexers) {
                 index.setConf(conf);
             }
 
@@ -150,7 +150,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
             bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
 
-            for (AccumuloIndexer index : secondaryIndexers) {
+            for (final AccumuloIndexer index : secondaryIndexers) {
                index.setConnector(connector);
                index.setMultiTableBatchWriter(mt_bw);
                index.init();
@@ -161,7 +161,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             checkVersion();
 
             initialized = true;
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -169,7 +169,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     @Override
 	public String getVersion() throws RyaDAOException {
         String version = null;
-        CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
+        final CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
         if (versIter.hasNext()) {
             version = versIter.next().getObject().getData();
         }
@@ -179,43 +179,43 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-    public void add(RyaStatement statement) throws RyaDAOException {
+    public void add(final RyaStatement statement) throws RyaDAOException {
         commit(Iterators.singletonIterator(statement));
     }
 
     @Override
-    public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+    public void add(final Iterator<RyaStatement> iter) throws RyaDAOException {
         commit(iter);
     }
 
     @Override
-    public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
+    public void delete(final RyaStatement stmt, final AccumuloRdfConfiguration aconf) throws RyaDAOException {
         this.delete(Iterators.singletonIterator(stmt), aconf);
     }
 
     @Override
-    public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException {
+    public void delete(final Iterator<RyaStatement> statements, final AccumuloRdfConfiguration conf) throws RyaDAOException {
         try {
             while (statements.hasNext()) {
-                RyaStatement stmt = statements.next();
+                final RyaStatement stmt = statements.next();
                 //query first
-                CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
+                final CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
                 while (query.hasNext()) {
                     deleteSingleRyaStatement(query.next());
                 }
 
-                for (AccumuloIndexer index : secondaryIndexers) {
+                for (final AccumuloIndexer index : secondaryIndexers) {
                     index.deleteStatement(stmt);
                 }
             }
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
 
     @Override
-    public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException {
+    public void dropGraph(final AccumuloRdfConfiguration conf, final RyaURI... graphs) throws RyaDAOException {
         BatchDeleter bd_spo = null;
         BatchDeleter bd_po = null;
         BatchDeleter bd_osp = null;
@@ -229,7 +229,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             bd_po.setRanges(Collections.singleton(new Range()));
             bd_osp.setRanges(Collections.singleton(new Range()));
 
-            for (RyaURI graph : graphs){
+            for (final RyaURI graph : graphs){
                 bd_spo.fetchColumnFamily(new Text(graph.getData()));
                 bd_po.fetchColumnFamily(new Text(graph.getData()));
                 bd_osp.fetchColumnFamily(new Text(graph.getData()));
@@ -244,7 +244,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 //                index.dropGraph(graphs);
 //            }
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         } finally {
             if (bd_spo != null) {
@@ -260,34 +260,34 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
     }
 
-    protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException {
-        Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
+    protected void deleteSingleRyaStatement(final RyaStatement stmt) throws IOException, MutationsRejectedException {
+        final Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
         bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
         bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
         bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
     }
 
-    protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException {
+    protected void commit(final Iterator<RyaStatement> commitStatements) throws RyaDAOException {
         try {
             //TODO: Should have a lock here in case we are adding and committing at the same time
             while (commitStatements.hasNext()) {
-                RyaStatement stmt = commitStatements.next();
+                final RyaStatement stmt = commitStatements.next();
 
-                Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
-                Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
-                Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
-                Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+                final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
+                final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+                final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+                final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
                 bw_spo.addMutations(spo);
                 bw_po.addMutations(po);
                 bw_osp.addMutations(osp);
 
-                for (AccumuloIndexer index : secondaryIndexers) {
+                for (final AccumuloIndexer index : secondaryIndexers) {
                     index.storeStatement(stmt);
                 }
             }
 
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -303,57 +303,57 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             mt_bw.flush();
 
             mt_bw.close();
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.destroy();
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.warn("Failed to destroy indexer", e);
             }
         }
     }
 
     @Override
-    public void addNamespace(String pfx, String namespace) throws RyaDAOException {
+    public void addNamespace(final String pfx, final String namespace) throws RyaDAOException {
         try {
-            Mutation m = new Mutation(new Text(pfx));
+            final Mutation m = new Mutation(new Text(pfx));
             m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
             bw_ns.addMutation(m);
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
 
     @Override
-    public String getNamespace(String pfx) throws RyaDAOException {
+    public String getNamespace(final String pfx) throws RyaDAOException {
         try {
-            Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+            final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
                     ALL_AUTHORIZATIONS);
             scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
             scanner.setRange(new Range(new Text(pfx)));
-            Iterator<Map.Entry<Key, Value>> iterator = scanner
+            final Iterator<Map.Entry<Key, Value>> iterator = scanner
                     .iterator();
 
             if (iterator.hasNext()) {
                 return new String(iterator.next().getValue().get());
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
         return null;
     }
 
     @Override
-    public void removeNamespace(String pfx) throws RyaDAOException {
+    public void removeNamespace(final String pfx) throws RyaDAOException {
         try {
-            Mutation del = new Mutation(new Text(pfx));
+            final Mutation del = new Mutation(new Text(pfx));
             del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
             bw_ns.addMutation(del);
             if (flushEachUpdate) { mt_bw.flush(); }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -362,12 +362,12 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     @Override
     public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException {
         try {
-            Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+            final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
                     ALL_AUTHORIZATIONS);
             scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
-            Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+            final Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
             return new AccumuloNamespaceTableIterator(result);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RyaDAOException(e);
         }
     }
@@ -378,21 +378,21 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-    public void purge(RdfCloudTripleStoreConfiguration configuration) {
-        for (String tableName : getTables()) {
+    public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+        for (final String tableName : getTables()) {
             try {
                 purge(tableName, configuration.getAuths());
                 compact(tableName);
-            } catch (TableNotFoundException e) {
+            } catch (final TableNotFoundException e) {
                 logger.error(e.getMessage());
-            } catch (MutationsRejectedException e) {
+            } catch (final MutationsRejectedException e) {
                 logger.error(e.getMessage());
             }
         }
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.purge(configuration);
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.error("Failed to purge indexer", e);
             }
         }
@@ -400,24 +400,24 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
     @Override
     public void dropAndDestroy() throws RyaDAOException {
-        for (String tableName : getTables()) {
+        for (final String tableName : getTables()) {
             try {
                 drop(tableName);
-            } catch (AccumuloSecurityException e) {
+            } catch (final AccumuloSecurityException e) {
                 logger.error(e.getMessage());
                 throw new RyaDAOException(e);
-            } catch (AccumuloException e) {
+            } catch (final AccumuloException e) {
                 logger.error(e.getMessage());
                 throw new RyaDAOException(e);
-            } catch (TableNotFoundException e) {
+            } catch (final TableNotFoundException e) {
                 logger.warn(e.getMessage());
             }
         }
         destroy();
-        for(AccumuloIndexer indexer : this.secondaryIndexers) {
+        for(final AccumuloIndexer indexer : this.secondaryIndexers) {
             try {
                 indexer.dropAndDestroy();
-            } catch(Exception e) {
+            } catch(final Exception e) {
                 logger.error("Failed to drop and destroy indexer", e);
             }
         }
@@ -427,7 +427,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return connector;
     }
 
-    public void setConnector(Connector connector) {
+    public void setConnector(final Connector connector) {
         this.connector = connector;
     }
 
@@ -435,7 +435,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return batchWriterConfig;
     }
 
-    public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+    public void setBatchWriterConfig(final BatchWriterConfig batchWriterConfig) {
         this.batchWriterConfig = batchWriterConfig;
     }
 
@@ -449,7 +449,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     }
 
     @Override
-	public void setConf(AccumuloRdfConfiguration conf) {
+	public void setConf(final AccumuloRdfConfiguration conf) {
         this.conf = conf;
     }
 
@@ -457,7 +457,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return ryaTableMutationsFactory;
     }
 
-    public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) {
+    public void setRyaTableMutationsFactory(final RyaTableMutationsFactory ryaTableMutationsFactory) {
         this.ryaTableMutationsFactory = ryaTableMutationsFactory;
     }
 
@@ -466,21 +466,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return queryEngine;
     }
 
-    public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+    public void setQueryEngine(final AccumuloRyaQueryEngine queryEngine) {
         this.queryEngine = queryEngine;
     }
 
+    @Override
     public void flush() throws RyaDAOException {
         try {
             mt_bw.flush();
-        } catch (MutationsRejectedException e) {
+        } catch (final MutationsRejectedException e) {
             throw new RyaDAOException(e);
         }
     }
 
     protected String[] getTables() {
         // core tables
-        List<String> tableNames = Lists.newArrayList(
+        final List<String> tableNames = Lists.newArrayList(
                 tableLayoutStrategy.getSpo(),
                 tableLayoutStrategy.getPo(),
                 tableLayoutStrategy.getOsp(),
@@ -488,17 +489,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
                 tableLayoutStrategy.getEval());
 
         // Additional Tables
-        for (AccumuloIndexer index : secondaryIndexers) {
+        for (final AccumuloIndexer index : secondaryIndexers) {
             tableNames.add(index.getTableName());
         }
 
         return tableNames.toArray(new String[]{});
     }
 
-    private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException {
+    private void purge(final String tableName, final String[] auths) throws TableNotFoundException, MutationsRejectedException {
         if (tableExists(tableName)) {
             logger.info("Purging accumulo table: " + tableName);
-            BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
+            final BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
             try {
                 batchDeleter.setRanges(Collections.singleton(new Range()));
                 batchDeleter.delete();
@@ -508,31 +509,31 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         }
     }
 
-    private void compact(String tableName) {
+    private void compact(final String tableName) {
         logger.info("Requesting major compaction for table " + tableName);
         try {
             connector.tableOperations().compact(tableName, null, null, true, false);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             logger.error(e.getMessage());
         }
     }
 
-    private boolean tableExists(String tableName) {
+    private boolean tableExists(final String tableName) {
         return getConnector().tableOperations().exists(tableName);
     }
 
-    private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+    private BatchDeleter createBatchDeleter(final String tableName, final Authorizations authorizations) throws TableNotFoundException {
         return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
     }
 
     private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
-        String version = getVersion();
+        final String version = getVersion();
         if (version == null) {
             //adding to core Rya tables but not Indexes
-            Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
-            Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
-            Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
-            Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+            final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+            final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+            final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+            final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
             bw_spo.addMutations(spo);
             bw_po.addMutations(po);
             bw_osp.addMutations(osp);
@@ -544,7 +545,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA);
     }
 
-    private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    private void drop(final String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
         logger.info("Dropping cloudbase table: " + tableName);
         connector.tableOperations().delete(tableName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index 067b682..418a155 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -40,6 +40,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword";
     public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
     public static final String USE_MOCK_MONGO = ".useMockInstance";
+    public static final String CONF_FLUSH_EACH_UPDATE = "rya.mongodb.dao.flusheachupdate";
+
     private MongoClient mongoClient;
 
     public MongoDBRdfConfiguration() {
@@ -99,6 +101,28 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     }
 
     /**
+     * @return {@code true} if each statement added to the batch writer should
+     * be flushed and written right away to the datastore. {@code false} if the
+     * statements should be queued and written to the datastore when the queue
+     * is full or after enough time has passed without a write.<p>
+     * Defaults to {@code true} if nothing is specified.
+     */
+    public boolean flushEachUpdate(){
+        return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+    }
+
+    /**
+     * Sets the {@link #CONF_FLUSH_EACH_UPDATE} property of the configuration.
+     * @param flush {@code true} if each statement added to the batch writer
+     * should be flushed and written right away to the datastore. {@code false}
+     * if the statements should be queued and written to the datastore when the
+     * queue is full or after enough time has passed without a write.
+     */
+    public void setFlush(final boolean flush){
+        setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+    }
+
+    /**
      * @return name of Mongo Collection containing Rya triples
      */
     public String getTriplesCollectionName() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
index daa8a67..a32651d 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -37,6 +37,11 @@ import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.persist.RyaNamespaceManager;
 import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
 import org.apache.rya.api.persist.query.RyaQueryEngine;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
 import org.apache.rya.mongodb.dao.MongoDBNamespaceManager;
 import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
 import org.apache.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
@@ -47,7 +52,6 @@ import com.mongodb.DB;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
 import com.mongodb.DuplicateKeyException;
-import com.mongodb.InsertOptions;
 import com.mongodb.MongoClient;
 
 /**
@@ -56,6 +60,8 @@ import com.mongodb.MongoClient;
 public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class);
 
+    private boolean isInitialized = false;
+    private boolean flushEachUpdate = true;
     private MongoDBRdfConfiguration conf;
     private final MongoClient mongoClient;
     private DB db;
@@ -67,6 +73,8 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     private List<MongoSecondaryIndex> secondaryIndexers;
     private Authorizations auths;
 
+    private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
     /**
      * Creates a new instance of {@link MongoDBRyaDAO}.
      * @param conf the {@link MongoDBRdfConfiguration}.
@@ -87,6 +95,7 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         this.mongoClient = mongoClient;
         conf.setMongoClient(mongoClient);
         auths = conf.getAuthorizations();
+        flushEachUpdate = conf.flushEachUpdate();
         init();
     }
 
@@ -116,6 +125,9 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
 
     @Override
     public void init() throws RyaDAOException {
+        if (isInitialized) {
+            return;
+        }
         secondaryIndexers = conf.getAdditionalIndexers();
         for(final MongoSecondaryIndex index: secondaryIndexers) {
             index.setConf(conf);
@@ -131,15 +143,34 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         for(final MongoSecondaryIndex index: secondaryIndexers) {
             index.init();
         }
+
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(coll), mongoDbBatchWriterConfig);
+        try {
+            mongoDbBatchWriter.start();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error starting MongoDB batch writer", e);
+        }
+        isInitialized = true;
     }
 
     @Override
     public boolean isInitialized() throws RyaDAOException {
-        return true;
+        return isInitialized;
     }
 
     @Override
     public void destroy() throws RyaDAOException {
+        if (!isInitialized) {
+            return;
+        }
+        isInitialized = false;
+        flush();
+        try {
+            mongoDbBatchWriter.shutdown();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
+        }
         if (mongoClient != null) {
             mongoClient.close();
         }
@@ -153,7 +184,15 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         try {
             final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility());
             if (canAdd) {
-                coll.insert(storageStrategy.serialize(statement));
+                final DBObject obj = storageStrategy.serialize(statement);
+                try {
+                    mongoDbBatchWriter.addObjectToQueue(obj);
+                    if (flushEachUpdate) {
+                        flush();
+                    }
+                } catch (final MongoDbBatchWriterException e) {
+                    throw new RyaDAOException("Error adding statement", e);
+                }
                 for(final RyaSecondaryIndexer index: secondaryIndexers) {
                     index.storeStatement(statement);
                 }
@@ -190,7 +229,14 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
                 throw new RyaDAOException("User does not have the required authorizations to add statement");
             }
         }
-        coll.insert(dbInserts, new InsertOptions().continueOnError(true));
+        try {
+            mongoDbBatchWriter.addObjectsToQueue(dbInserts);
+            if (flushEachUpdate) {
+                flush();
+            }
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error adding statements", e);
+        }
     }
 
     @Override
@@ -263,4 +309,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     public void dropAndDestroy() throws RyaDAOException {
         db.dropDatabase(); // this is dangerous!
     }
+
+    @Override
+    public void flush() throws RyaDAOException {
+        try {
+            mongoDbBatchWriter.flush();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new RyaDAOException("Error flushing data.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
new file mode 100644
index 0000000..2f52b5c
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.mongodb.batch.collection.CollectionType;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Handles batch writing MongoDB statement objects to the repository. It takes
+ * in a configurable batch size and flush time. If the number of objects placed
+ * in the queue reaches the batch size then the objects are bulk written to the
+ * datastore. Or if the queue has not filled up after the batch time duration
+ * has passed then the statements are flushed out and written to the datastore.
+ * @param <T> the type of object that the batch writer's internal collection
+ * type uses.
+ */
+public class MongoDbBatchWriter<T> {
+    private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class);
+
+    private static final int CHECK_QUEUE_INTERVAL_MS = 10;
+
+    private final CollectionType<T> collectionType;
+    private final long batchFlushTimeMs;
+
+    private final ArrayBlockingQueue<T> statementInsertionQueue;
+    private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(0);
+    private ScheduledFuture<?> flushBatchFuture;
+    private final Runnable flushBatchTask;
+    private Thread queueFullCheckerThread;
+
+    private final AtomicBoolean isInit = new AtomicBoolean();
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriter}.
+     * @param collectionType the {@link CollectionType}. (not {@code null})
+     * @param mongoDbBatchWriterConfig the {@link MongoDbBatchWriterConfig}.
+     * (not {@code null})
+     */
+    public MongoDbBatchWriter(final CollectionType<T> collectionType, final MongoDbBatchWriterConfig mongoDbBatchWriterConfig) {
+        this.collectionType = checkNotNull(collectionType);
+        this.batchFlushTimeMs = checkNotNull(mongoDbBatchWriterConfig).getBatchFlushTimeMs();
+
+        statementInsertionQueue = new ArrayBlockingQueue<>(mongoDbBatchWriterConfig.getBatchSize());
+        flushBatchTask = new BatchFlusher();
+    }
+
+    /**
+     * Task used to flush statements if enough time has passed without an
+     * insertion while there are objects enqueued.
+     */
+    private class BatchFlusher implements Runnable {
+        @Override
+        public void run() {
+            try {
+                if (!statementInsertionQueue.isEmpty()) {
+                    log.trace("Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed.");
+                    flush();
+                }
+            } catch (final Exception e) {
+                log.error("Error flush out the statements", e);
+            }
+        }
+    }
+
+    private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder()
+        .setNameFormat("Queue Full Checker Thread - %d")
+        .setDaemon(true)
+        .build();
+
+    /**
+     * Checks the queue for statements to insert if the queue is full.
+     */
+    private class QueueFullChecker implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (isInit.get()) {
+                    // Check if the queue is full and if it is then insert the
+                    // statements. Otherwise reset the insertion timer.
+                    if (statementInsertionQueue.remainingCapacity() == 0) {
+                        log.trace("Statement queue is FULL -> going to empty it");
+                        try {
+                            flush();
+                        } catch (final MongoDbBatchWriterException e) {
+                            log.error("Error emptying queue", e);
+                        }
+                    }
+                    Thread.sleep(CHECK_QUEUE_INTERVAL_MS);
+                }
+            } catch (final InterruptedException e) {
+                log.error("Encountered an unexpected error while checking the batch queue.", e);
+            }
+        }
+    }
+
+    /**
+     * Starts the batch writer queue and processes.
+     */
+    public void start() throws MongoDbBatchWriterException {
+        if (!isInit.get()) {
+            if (flushBatchFuture == null) {
+                flushBatchFuture = startFlushTimer();
+            }
+            if (queueFullCheckerThread == null) {
+                queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker());
+            }
+            isInit.set(true);
+            queueFullCheckerThread.start();
+        }
+    }
+
+    /**
+     * Stops the batch writer processes.
+     */
+    public void shutdown() throws MongoDbBatchWriterException {
+        isInit.set(false);
+        if (flushBatchFuture != null) {
+            flushBatchFuture.cancel(true);
+            flushBatchFuture = null;
+        }
+        if (queueFullCheckerThread != null) {
+            if (queueFullCheckerThread.isAlive()) {
+                try {
+                    queueFullCheckerThread.join(2 * CHECK_QUEUE_INTERVAL_MS);
+                } catch (final InterruptedException e) {
+                    log.error("Error waiting for thread to finish", e);
+                }
+                queueFullCheckerThread = null;
+            }
+        }
+    }
+
+    /**
+     * Adds a MongoDB object to the queue which will not be written until one of
+     * the following occur:<br>
+     * <ul>
+     *  <li>The queue fills up</li>
+     *  <li>The flush time has been reached</li>
+     *  <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+     *  has been made</li>
+     * </ul>
+     * @param object the object to add to the queue.
+     * @throws IOException
+     */
+    public void addObjectToQueue(final T object) throws MongoDbBatchWriterException {
+        if (object != null) {
+            try {
+                // Place in the queue which will bulk write after the specified
+                // "batchSize" number of items have filled the queue or if more
+                // than "batchFlushTimeMs" milliseconds have passed since the
+                // last insertion.
+                resetFlushTimer();
+                statementInsertionQueue.put(object);
+            } catch (final Exception e) {
+                throw new MongoDbBatchWriterException("Error adding object to batch queue.", e);
+            }
+        }
+    }
+
+    /**
+     * Adds a list of MongoDB objects to the queue which will not be written
+     * until one of the following occur:<br>
+     * <ul>
+     *  <li>The queue fills up</li>
+     *  <li>The flush time has been reached</li>
+     *  <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+     *  has been made</li>
+     * </ul>
+     * @param objects a {@link List} of objects to add to the queue.
+     * @throws IOException
+     */
+    public void addObjectsToQueue(final List<T> objects) throws MongoDbBatchWriterException {
+        if (objects != null) {
+            for (final T object : objects) {
+                addObjectToQueue(object);
+            }
+        }
+    }
+
+    /**
+     * Flushes out statements that are in the queue.
+     */
+    public void flush() throws MongoDbBatchWriterException {
+        final List<T> batch = new ArrayList<>();
+        try {
+            statementInsertionQueue.drainTo(batch);
+            if (!batch.isEmpty()) {
+                collectionType.insertMany(batch);
+            }
+        } catch (final Exception e) {
+            throw new MongoDbBatchWriterException("Error flushing statements", e);
+        }
+    }
+
+    private void resetFlushTimer() throws MongoDbBatchWriterException {
+        flushBatchFuture.cancel(false);
+        flushBatchFuture = startFlushTimer();
+    }
+
+    private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException {
+        try {
+            return scheduledExecutor.schedule(flushBatchTask, batchFlushTimeMs, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            throw new MongoDbBatchWriterException("Error starting batch flusher", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
new file mode 100644
index 0000000..cec8b9a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configuration for the MongoDB Batch Writer.
+ */
+public class MongoDbBatchWriterConfig {
+    /**
+     * The default number of statements to batch write at a time.
+     */
+    public static final int DEFAULT_BATCH_SIZE = 50000;
+    private Integer batchSize = null;
+
+    /**
+     * The default time to wait in milliseconds to flush all statements out that
+     * are queued for insertion if the queue has not filled up to its capacity
+     * of {@link #DEFAULT_BATCH_SIZE} or the user configured buffer size.
+     */
+    public static final long DEFAULT_BATCH_FLUSH_TIME_MS = 100L;
+    private Long batchFlushTimeMs = null;
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig() {
+    }
+
+    /**
+     * Gets the configured number of statements to batch write at a time.
+     * @return the configured value or the default value.
+     */
+    public int getBatchSize() {
+        return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE;
+    }
+
+    /**
+     * Sets the number of statements to batch write at a time.
+     * @param batchSize the number of statements in each batch.
+     * @return the {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig setBatchSize(final int batchSize) {
+        Preconditions.checkArgument(batchSize > 0, "Batch size must be positive.");
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    /**
+     * Gets the configured time to wait in milliseconds to flush all statements
+     * out that are queued for insertion if the queue has not filled up to its
+     * capacity.
+     * @return the configured value or the default value.
+     */
+    public long getBatchFlushTimeMs() {
+        return batchFlushTimeMs != null ? batchFlushTimeMs : DEFAULT_BATCH_FLUSH_TIME_MS;
+    }
+
+    /**
+     * Sets the time to wait in milliseconds to flush all statements out that
+     * are queued for insertion if the queue has not filled up to its capacity.
+     * @param batchFlushTimeMs the time to wait before flushing all queued
+     * statements that have not been written.
+     * @return the {@link MongoDbBatchWriterConfig}.
+     */
+    public MongoDbBatchWriterConfig setBatchFlushTimeMs(final long batchFlushTimeMs) {
+        Preconditions.checkArgument(batchFlushTimeMs >= 0, "Batch flush time must be non-negative.");
+        this.batchFlushTimeMs = batchFlushTimeMs;
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
new file mode 100644
index 0000000..d4de156
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+/**
+ * An exception to be used when there is a problem running the MongoDB Batch
+ * Writer.
+ */
+public class MongoDbBatchWriterException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     */
+    public MongoDbBatchWriterException() {
+        super();
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param message the detail message.
+     */
+    public MongoDbBatchWriterException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param message the detail message.
+     * @param throwable the {@link Throwable} source.
+     */
+    public MongoDbBatchWriterException(final String message, final Throwable source) {
+        super(message, source);
+    }
+
+    /**
+     * Creates a new instance of {@link MongoDbBatchWriterException}.
+     * @param source the {@link Throwable} source.
+     */
+    public MongoDbBatchWriterException(final Throwable source) {
+        super(source);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
new file mode 100644
index 0000000..99e8992
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Constants and utility methods related to batch writing statements in a MongoDB
+ * Rya repository.
+ */
+public final class MongoDbBatchWriterUtils {
+    /**
+     * Config tag used to specify the number of statements to batch write at a
+     * time.
+     */
+    public static final String BATCH_SIZE_TAG = "rya.mongodb.dao.batchwriter.size";
+
+    /**
+     * Config tag used to specify the time to wait in milliseconds to flush all
+     * statements out that are queued for insertion if the queue has not filled
+     * up to its capacity.
+     */
+    public static final String BATCH_FLUSH_TIME_MS_TAG = "rya.mongodb.dao.batchwriter.flushtime";
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private MongoDbBatchWriterUtils() {
+    }
+
+    /**
+     * The number of statements to batch write at a time.
+     * @param conf the {@link Configuration} to check.
+     * @return the configured value or the default value.
+     */
+    public static int getConfigBatchSize(final Configuration conf) {
+        return conf.getInt(BATCH_SIZE_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_SIZE);
+    }
+
+    /**
+     * The time to wait in milliseconds to flush all statements out that are
+     * queued for insertion if the queue has not filled up to its capacity.
+     * @param conf the {@link Configuration} to check.
+     * @return the configured value or the default value.
+     */
+    public static long getConfigBatchFlushTimeMs(final Configuration conf) {
+        return conf.getLong(BATCH_FLUSH_TIME_MS_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_FLUSH_TIME_MS);
+    }
+
+    /**
+     * Reads the specified configed to create and initialize a
+     * {@link MongoDbBatchWriterConfig}. If no values are found then the default
+     * values are used.
+     * @param conf the {@link Configuration} to check.
+     * @return  the {@link MongoDbBatchWriterConfig} populated with configured
+     * values for the specified {@code conf}.
+     */
+    public static MongoDbBatchWriterConfig getMongoDbBatchWriterConfig(final Configuration conf) {
+        final int batchSize = getConfigBatchSize(conf);
+        final long batchFlushTimeMs = getConfigBatchFlushTimeMs(conf);
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = new MongoDbBatchWriterConfig();
+        mongoDbBatchWriterConfig.setBatchSize(batchSize);
+        mongoDbBatchWriterConfig.setBatchFlushTimeMs(batchFlushTimeMs);
+        return mongoDbBatchWriterConfig;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
new file mode 100644
index 0000000..9e6d6fb
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import java.util.List;
+
+/**
+ * Wrapper for interacting with the new and legacy MongoDB collection types
+ * ({@link com.mongodb.client.MongoCollection} and
+ * {@link com.mongodb.DBCollection} respectively)
+ * in order to handle inserts from both types and the object types they
+ * utilize.
+ * @param <T> the type of object the collection type inserts.
+ */
+public interface CollectionType<T> {
+    /**
+     * Insert one item.
+     * @param item the item to insert.
+     */
+    public void insertOne(final T item);
+
+    /**
+     * Insert a list of items.
+     * @param items the {@link List} of items.
+     */
+    public void insertMany(final List<T> items);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
new file mode 100644
index 0000000..ea00693
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.InsertOptions;
+import com.mongodb.WriteConcern;
+
+/**
+ * Provides access to the {@link DBCollection} type.
+ */
+public class DbCollectionType implements CollectionType<DBObject> {
+    private final DBCollection collection;
+
+    /**
+     * Creates a new instance of {@link DbCollectionType}.
+     * @param collection the {@link DBCollection}. (not {@code null})
+     */
+    public DbCollectionType(final DBCollection collection) {
+        this.collection = checkNotNull(collection);
+    }
+
+    @Override
+    public void insertOne(final DBObject item) {
+        collection.insert(item, WriteConcern.ACKNOWLEDGED);
+    }
+
+    @Override
+    public void insertMany(final List<DBObject> items) {
+        collection.insert(items, new InsertOptions().continueOnError(true));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
new file mode 100644
index 0000000..8fb796a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import org.bson.Document;
+
+import com.mongodb.client.MongoCollection;
+
+/**
+ * Provides access to the {@link MongoCollection} type.
+ */
+public class MongoCollectionType implements CollectionType<Document> {
+    private final MongoCollection<Document> collection;
+
+    /**
+     * Creates a new instance of {@link MongoCollectionType}.
+     * @param collection the {@link MongoCollection}. (not {@code null})
+     */
+    public MongoCollectionType(final MongoCollection<Document> collection) {
+        this.collection = checkNotNull(collection);
+    }
+
+    @Override
+    public void insertOne(final Document item) {
+        collection.insertOne(item);
+    }
+
+    @Override
+    public void insertMany(final List<Document> items) {
+        collection.insertMany(items);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index f5372d1..69ca274 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -33,6 +33,11 @@ import org.apache.rya.mongodb.MongoConnectorFactory;
 import org.apache.rya.mongodb.MongoDBRdfConfiguration;
 import org.apache.rya.mongodb.MongoDBRyaDAO;
 import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
 import org.openrdf.model.Literal;
 import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
@@ -46,7 +51,6 @@ import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.QueryBuilder;
 import com.mongodb.ServerAddress;
-import com.mongodb.WriteConcern;
 
 import info.aduna.iteration.CloseableIteration;
 
@@ -58,6 +62,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
 
     private boolean isInit = false;
+    private boolean flushEachUpdate = true;
     protected Configuration conf;
     protected MongoDBRyaDAO dao;
     protected MongoClient mongoClient;
@@ -68,15 +73,28 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
 
     protected T storageStrategy;
 
+    private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
     protected void initCore() {
         dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME);
         db = this.mongoClient.getDB(dbName);
-        collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
+        final String collectionName = conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName();
+        collection = db.getCollection(collectionName);
+
+        flushEachUpdate = ((MongoDBRdfConfiguration)conf).flushEachUpdate();
+
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(collection), mongoDbBatchWriterConfig);
+        try {
+            mongoDbBatchWriter.start();
+        } catch (final MongoDbBatchWriterException e) {
+            LOG.error("Error start MongoDB batch writer", e);
+        }
     }
 
     @Override
     public void setClient(final MongoClient client){
-    	this.mongoClient = client;
+        this.mongoClient = client;
     }
 
     @VisibleForTesting
@@ -96,8 +114,8 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     public void setConf(final Configuration conf) {
         this.conf = conf;
         if (!isInit){
-        	setClient(MongoConnectorFactory.getMongoClient(conf));
-        	init();
+            setClient(MongoConnectorFactory.getMongoClient(conf));
+            init();
         }
     }
 
@@ -108,6 +126,11 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
 
     @Override
     public void flush() throws IOException {
+        try {
+            mongoDbBatchWriter.flush();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new IOException("Error flushing batch writer", e);
+        }
     }
 
     @Override
@@ -135,24 +158,43 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
     public void storeStatements(final Collection<RyaStatement> ryaStatements)
             throws IOException {
         for (final RyaStatement ryaStatement : ryaStatements){
-            storeStatement(ryaStatement);
+            storeStatement(ryaStatement, false);
+        }
+        if (flushEachUpdate) {
+            flush();
         }
     }
 
     @Override
     public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+        storeStatement(ryaStatement, flushEachUpdate);
+    }
+
+    private void storeStatement(final RyaStatement ryaStatement, final boolean flush) throws IOException {
+        final DBObject obj = prepareStatementForStorage(ryaStatement);
+        try {
+            mongoDbBatchWriter.addObjectToQueue(obj);
+            if (flush) {
+                flush();
+            }
+        } catch (final MongoDbBatchWriterException e) {
+            throw new IOException("Error storing statement", e);
+        }
+    }
+
+    private DBObject prepareStatementForStorage(final RyaStatement ryaStatement) {
         try {
             final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
             final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate());
             if (isValidPredicate && (statement.getObject() instanceof Literal)) {
                 final DBObject obj = storageStrategy.serialize(ryaStatement);
-                if (obj != null) {
-                    collection.insert(obj, WriteConcern.ACKNOWLEDGED);
-                }
+                return obj;
             }
         } catch (final IllegalArgumentException e) {
             LOG.error("Unable to parse the statement: " + ryaStatement.toString(), e);
         }
+
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
index 4ddb2a5..6fac386 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
@@ -21,9 +21,7 @@ package org.apache.rya.indexing.mongo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
@@ -36,12 +34,13 @@ import org.apache.rya.indexing.entity.storage.EntityStorage;
 import org.apache.rya.indexing.entity.storage.TypeStorage;
 import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
 import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
 import org.apache.rya.mongodb.MongoDBRdfConfiguration;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.bson.Document;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.openrdf.model.Statement;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
@@ -70,7 +69,7 @@ public class MongoEntityIndexIT {
     private MongoClient mongoClient;
 
     @Before
-    public void setUp() throws Exception{
+    public void setUp() throws Exception {
         mongoClient = MockMongoFactory.with(Version.Main.PRODUCTION).newMongoClient();
         conf = new MongoDBRdfConfiguration();
         conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test");
@@ -91,6 +90,19 @@ public class MongoEntityIndexIT {
         indexer.init();
     }
 
+    @After
+    public void tearDown() throws Exception {
+        if (mongoClient != null) {
+            MongoConnectorFactory.closeMongoClient();
+        }
+        if (conn != null) {
+            conn.clear();
+        }
+        if (indexer != null) {
+            indexer.close();
+        }
+    }
+
     @Test
     public void ensureInEntityStore_Test() throws Exception {
         setupTypes();
@@ -202,8 +214,6 @@ public class MongoEntityIndexIT {
     }
 
     private void addStatements() throws Exception {
-        final List<Statement> stmnts = new ArrayList<>();
-
         //alice
         URI subject = VF.createURI("urn:alice");
         URI predicate = VF.createURI("urn:name");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
index b533d42..4b66b5b 100644
--- a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
@@ -31,6 +31,8 @@ import org.apache.rya.indexing.TemporalInstantRfc3339;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
 import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.openrdf.model.Resource;
@@ -81,6 +83,16 @@ public class MongoIndexerDeleteIT {
         conn.begin();
     }
 
+    @After
+    public void after() throws Exception {
+        if (client != null) {
+            MongoConnectorFactory.closeMongoClient();
+        }
+        if (conn != null) {
+            conn.clear();
+        }
+    }
+
     @Test
     public void deleteTest() throws Exception {
         populateRya();
@@ -150,14 +162,10 @@ public class MongoIndexerDeleteIT {
         uuid = "urn:people";
         conn.add(VF.createURI(uuid), RDF.TYPE, person);
         conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Alice Palace Hose", VF.createURI("http://www.w3.org/2001/XMLSchema#string")));
-
-        uuid = "urn:people";
-        conn.add(VF.createURI(uuid), RDF.TYPE, person);
         conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Bob Snob Hose", "en"));
 
         // temporal
         final TemporalInstant instant = new TemporalInstantRfc3339(1, 2, 3, 4, 5, 6);
-        final URI time = VF.createURI("Property:atTime");
         conn.add(VF.createURI("foo:time"), VF.createURI("Property:atTime"), VF.createLiteral(instant.toString()));
     }