You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/08/04 02:10:28 UTC
[1/2] incubator-rya git commit: RYA-307 Improved Rya MongoDB ingest
of statements ... Closes #181
Repository: incubator-rya
Updated Branches:
refs/heads/master fa2aad55f -> 8def4caca
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
----------------------------------------------------------------------
diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
index 44b0bd4..6784c90 100644
--- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
+++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
@@ -8,9 +8,9 @@ package org.apache.rya.rdftriplestore;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,15 +23,16 @@ package org.apache.rya.rdftriplestore;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import info.aduna.iteration.CloseableIteration;
import java.lang.reflect.Constructor;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import org.apache.hadoop.conf.Configurable;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.domain.RyaStatement;
@@ -62,8 +63,6 @@ import org.apache.rya.rdftriplestore.namespace.NamespaceManager;
import org.apache.rya.rdftriplestore.provenance.ProvenanceCollectionException;
import org.apache.rya.rdftriplestore.provenance.ProvenanceCollector;
import org.apache.rya.rdftriplestore.utils.DefaultStatistics;
-
-import org.apache.hadoop.conf.Configurable;
import org.openrdf.model.Namespace;
import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
@@ -99,21 +98,23 @@ import org.openrdf.query.impl.EmptyBindingSet;
import org.openrdf.sail.SailException;
import org.openrdf.sail.helpers.SailConnectionBase;
+import info.aduna.iteration.CloseableIteration;
+
public class RdfCloudTripleStoreConnection extends SailConnectionBase {
- private RdfCloudTripleStore store;
+ private final RdfCloudTripleStore store;
private RdfEvalStatsDAO rdfEvalStatsDAO;
private SelectivityEvalDAO selectEvalDAO;
private RyaDAO ryaDAO;
private InferenceEngine inferenceEngine;
private NamespaceManager namespaceManager;
- private RdfCloudTripleStoreConfiguration conf;
-
+ private final RdfCloudTripleStoreConfiguration conf;
+
private ProvenanceCollector provenanceCollector;
- public RdfCloudTripleStoreConnection(RdfCloudTripleStore sailBase, RdfCloudTripleStoreConfiguration conf, ValueFactory vf)
+ public RdfCloudTripleStoreConnection(final RdfCloudTripleStore sailBase, final RdfCloudTripleStoreConfiguration conf, final ValueFactory vf)
throws SailException {
super(sailBase);
this.store = sailBase;
@@ -138,54 +139,56 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
this.namespaceManager = store.getNamespaceManager();
this.provenanceCollector = store.getProvenanceCollector();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new SailException(e);
}
}
@Override
- protected void addStatementInternal(Resource subject, URI predicate,
- Value object, Resource... contexts) throws SailException {
+ protected void addStatementInternal(final Resource subject, final URI predicate,
+ final Value object, final Resource... contexts) throws SailException {
try {
- String cv_s = conf.getCv();
- byte[] cv = cv_s == null ? null : cv_s.getBytes();
+ final String cv_s = conf.getCv();
+ final byte[] cv = cv_s == null ? null : cv_s.getBytes();
+ final List<RyaStatement> ryaStatements = new ArrayList<>();
if (contexts != null && contexts.length > 0) {
- for (Resource context : contexts) {
- RyaStatement statement = new RyaStatement(
+ for (final Resource context : contexts) {
+ final RyaStatement statement = new RyaStatement(
RdfToRyaConversions.convertResource(subject),
RdfToRyaConversions.convertURI(predicate),
RdfToRyaConversions.convertValue(object),
RdfToRyaConversions.convertResource(context),
null, new StatementMetadata(), cv);
- ryaDAO.add(statement);
+ ryaStatements.add(statement);
}
} else {
- RyaStatement statement = new RyaStatement(
+ final RyaStatement statement = new RyaStatement(
RdfToRyaConversions.convertResource(subject),
RdfToRyaConversions.convertURI(predicate),
RdfToRyaConversions.convertValue(object),
null, null, new StatementMetadata(), cv);
- ryaDAO.add(statement);
+ ryaStatements.add(statement);
}
- } catch (RyaDAOException e) {
+ ryaDAO.add(ryaStatements.iterator());
+ } catch (final RyaDAOException e) {
throw new SailException(e);
}
}
-
-
-
+
+
+
@Override
- protected void clearInternal(Resource... aresource) throws SailException {
+ protected void clearInternal(final Resource... aresource) throws SailException {
try {
- RyaURI[] graphs = new RyaURI[aresource.length];
+ final RyaURI[] graphs = new RyaURI[aresource.length];
for (int i = 0 ; i < graphs.length ; i++){
graphs[i] = RdfToRyaConversions.convertResource(aresource[i]);
}
ryaDAO.dropGraph(conf, graphs);
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
throw new SailException(e);
}
}
@@ -208,63 +211,63 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
@Override
protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(
- TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
- boolean flag) throws SailException {
+ TupleExpr tupleExpr, final Dataset dataset, BindingSet bindings,
+ final boolean flag) throws SailException {
verifyIsOpen();
logger.trace("Incoming query model:\n{}", tupleExpr.toString());
if (provenanceCollector != null){
try {
provenanceCollector.recordQuery(tupleExpr.toString());
- } catch (ProvenanceCollectionException e) {
+ } catch (final ProvenanceCollectionException e) {
// TODO silent fail
e.printStackTrace();
}
}
tupleExpr = tupleExpr.clone();
- RdfCloudTripleStoreConfiguration queryConf = store.getConf().clone();
+ final RdfCloudTripleStoreConfiguration queryConf = store.getConf().clone();
if (bindings != null) {
- Binding dispPlan = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG);
+ final Binding dispPlan = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG);
if (dispPlan != null) {
queryConf.setDisplayQueryPlan(Boolean.parseBoolean(dispPlan.getValue().stringValue()));
}
- Binding authBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
+ final Binding authBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
if (authBinding != null) {
queryConf.setAuths(authBinding.getValue().stringValue().split(","));
}
- Binding ttlBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_TTL);
+ final Binding ttlBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_TTL);
if (ttlBinding != null) {
queryConf.setTtl(Long.valueOf(ttlBinding.getValue().stringValue()));
}
- Binding startTimeBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_STARTTIME);
+ final Binding startTimeBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_STARTTIME);
if (startTimeBinding != null) {
queryConf.setStartTime(Long.valueOf(startTimeBinding.getValue().stringValue()));
}
- Binding performantBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT);
+ final Binding performantBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT);
if (performantBinding != null) {
queryConf.setBoolean(RdfCloudTripleStoreConfiguration.CONF_PERFORMANT, Boolean.parseBoolean(performantBinding.getValue().stringValue()));
}
- Binding inferBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_INFER);
+ final Binding inferBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_INFER);
if (inferBinding != null) {
queryConf.setInfer(Boolean.parseBoolean(inferBinding.getValue().stringValue()));
}
- Binding useStatsBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_USE_STATS);
+ final Binding useStatsBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_USE_STATS);
if (useStatsBinding != null) {
queryConf.setUseStats(Boolean.parseBoolean(useStatsBinding.getValue().stringValue()));
}
- Binding offsetBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_OFFSET);
+ final Binding offsetBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_OFFSET);
if (offsetBinding != null) {
queryConf.setOffset(Long.parseLong(offsetBinding.getValue().stringValue()));
}
- Binding limitBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_LIMIT);
+ final Binding limitBinding = bindings.getBinding(RdfCloudTripleStoreConfiguration.CONF_LIMIT);
if (limitBinding != null) {
queryConf.setLimit(Long.parseLong(limitBinding.getValue().stringValue()));
}
@@ -277,15 +280,15 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
try {
- List<Class<QueryOptimizer>> optimizers = queryConf.getOptimizers();
- Class<QueryOptimizer> pcjOptimizer = queryConf.getPcjOptimizer();
-
+ final List<Class<QueryOptimizer>> optimizers = queryConf.getOptimizers();
+ final Class<QueryOptimizer> pcjOptimizer = queryConf.getPcjOptimizer();
+
if(pcjOptimizer != null) {
QueryOptimizer opt = null;
try {
- Constructor<QueryOptimizer> construct = pcjOptimizer.getDeclaredConstructor(new Class[] {});
+ final Constructor<QueryOptimizer> construct = pcjOptimizer.getDeclaredConstructor(new Class[] {});
opt = construct.newInstance();
- } catch (Exception e) {
+ } catch (final Exception e) {
}
if (opt == null) {
throw new NoSuchMethodException("Could not find valid constructor for " + pcjOptimizer.getName());
@@ -295,10 +298,10 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
opt.optimize(tupleExpr, dataset, bindings);
}
-
+
final ParallelEvaluationStrategyImpl strategy = new ParallelEvaluationStrategyImpl(
new StoreTripleSource(queryConf, ryaDAO), inferenceEngine, dataset, queryConf);
-
+
(new BindingAssigner()).optimize(tupleExpr, dataset, bindings);
(new ConstantOptimizer(strategy)).optimize(tupleExpr, dataset,
bindings);
@@ -310,22 +313,22 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
(new SameTermFilterOptimizer()).optimize(tupleExpr, dataset,
bindings);
(new QueryModelNormalizer()).optimize(tupleExpr, dataset, bindings);
-
+
(new IterativeEvaluationOptimizer()).optimize(tupleExpr, dataset,
bindings);
if (!optimizers.isEmpty()) {
- for (Class<QueryOptimizer> optclz : optimizers) {
+ for (final Class<QueryOptimizer> optclz : optimizers) {
QueryOptimizer result = null;
try {
- Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(new Class[] {});
+ final Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(new Class[] {});
result = meth.newInstance();
- } catch (Exception e) {
+ } catch (final Exception e) {
}
try {
- Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(EvaluationStrategy.class);
+ final Constructor<QueryOptimizer> meth = optclz.getDeclaredConstructor(EvaluationStrategy.class);
result = meth.newInstance(strategy);
- } catch (Exception e) {
+ } catch (final Exception e) {
}
if (result == null) {
throw new NoSuchMethodException("Could not find valid constructor for " + optclz.getName());
@@ -339,7 +342,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
(new FilterOptimizer()).optimize(tupleExpr, dataset, bindings);
(new OrderLimitOptimizer()).optimize(tupleExpr, dataset, bindings);
-
+
logger.trace("Optimized query model:\n{}", tupleExpr.toString());
if (queryConf.isInfer()
@@ -354,7 +357,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
tupleExpr.visit(new SubPropertyOfVisitor(queryConf, inferenceEngine));
tupleExpr.visit(new SubClassOfVisitor(queryConf, inferenceEngine));
tupleExpr.visit(new SameAsVisitor(queryConf, inferenceEngine));
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
}
}
@@ -363,7 +366,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
// tupleExpr.visit(new FilterTimeIndexVisitor(queryConf));
// tupleExpr.visit(new PartitionFilterTimeIndexVisitor(queryConf));
}
- FilterRangeVisitor rangeVisitor = new FilterRangeVisitor(queryConf);
+ final FilterRangeVisitor rangeVisitor = new FilterRangeVisitor(queryConf);
tupleExpr.visit(rangeVisitor);
tupleExpr.visit(rangeVisitor); //this has to be done twice to get replace the statementpatterns with the right ranges
EvaluationStatistics stats = null;
@@ -382,7 +385,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
if (stats instanceof RdfCloudTripleStoreSelectivityEvaluationStatistics) {
- (new QueryJoinSelectOptimizer((RdfCloudTripleStoreSelectivityEvaluationStatistics) stats,
+ (new QueryJoinSelectOptimizer(stats,
selectEvalDAO)).optimize(tupleExpr, dataset, bindings);
} else {
@@ -393,23 +396,23 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
final CloseableIteration<BindingSet, QueryEvaluationException> iter = strategy
.evaluate(tupleExpr, EmptyBindingSet.getInstance());
- CloseableIteration<BindingSet, QueryEvaluationException> iterWrap = new CloseableIteration<BindingSet, QueryEvaluationException>() {
-
+ final CloseableIteration<BindingSet, QueryEvaluationException> iterWrap = new CloseableIteration<BindingSet, QueryEvaluationException>() {
+
@Override
public void remove() throws QueryEvaluationException {
iter.remove();
}
-
+
@Override
public BindingSet next() throws QueryEvaluationException {
return iter.next();
}
-
+
@Override
public boolean hasNext() throws QueryEvaluationException {
return iter.hasNext();
}
-
+
@Override
public void close() throws QueryEvaluationException {
iter.close();
@@ -417,9 +420,9 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
};
return iterWrap;
- } catch (QueryEvaluationException e) {
+ } catch (final QueryEvaluationException e) {
throw new SailException(e);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new SailException(e);
}
}
@@ -434,7 +437,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
@Override
- protected String getNamespaceInternal(String s) throws SailException {
+ protected String getNamespaceInternal(final String s) throws SailException {
return namespaceManager.getNamespace(s);
}
@@ -446,8 +449,8 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
@Override
protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(
- Resource subject, URI predicate, Value object, boolean flag,
- Resource... contexts) throws SailException {
+ final Resource subject, final URI predicate, final Value object, final boolean flag,
+ final Resource... contexts) throws SailException {
// try {
//have to do this to get the inferred values
//TODO: Will this method reduce performance?
@@ -470,7 +473,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
isClosed = true;
try {
evaluate.close();
- } catch (QueryEvaluationException e) {
+ } catch (final QueryEvaluationException e) {
throw new SailException(e);
}
}
@@ -479,7 +482,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
public boolean hasNext() throws SailException {
try {
return evaluate.hasNext();
- } catch (QueryEvaluationException e) {
+ } catch (final QueryEvaluationException e) {
throw new SailException(e);
}
}
@@ -491,11 +494,11 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
try {
- BindingSet next = evaluate.next();
- Resource bs_subj = (Resource) ((subjVar.hasValue()) ? subjVar.getValue() : next.getBinding(subjVar.getName()).getValue());
- URI bs_pred = (URI) ((predVar.hasValue()) ? predVar.getValue() : next.getBinding(predVar.getName()).getValue());
- Value bs_obj = (objVar.hasValue()) ? objVar.getValue() : (Value) next.getBinding(objVar.getName()).getValue();
- Binding b_cntxt = next.getBinding(cntxtVar.getName());
+ final BindingSet next = evaluate.next();
+ final Resource bs_subj = (Resource) ((subjVar.hasValue()) ? subjVar.getValue() : next.getBinding(subjVar.getName()).getValue());
+ final URI bs_pred = (URI) ((predVar.hasValue()) ? predVar.getValue() : next.getBinding(predVar.getName()).getValue());
+ final Value bs_obj = (objVar.hasValue()) ? objVar.getValue() : (Value) next.getBinding(objVar.getName()).getValue();
+ final Binding b_cntxt = next.getBinding(cntxtVar.getName());
//convert BindingSet to Statement
if (b_cntxt != null) {
@@ -503,7 +506,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
} else {
return new StatementImpl(bs_subj, bs_pred, bs_obj);
}
- } catch (QueryEvaluationException e) {
+ } catch (final QueryEvaluationException e) {
throw new SailException(e);
}
}
@@ -512,7 +515,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
public void remove() throws SailException {
try {
evaluate.remove();
- } catch (QueryEvaluationException e) {
+ } catch (final QueryEvaluationException e) {
throw new SailException(e);
}
}
@@ -522,7 +525,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
// }
}
- protected Var decorateValue(Value val, String name) {
+ protected Var decorateValue(final Value val, final String name) {
if (val == null) {
return new Var(name);
} else {
@@ -531,24 +534,24 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
@Override
- protected void removeNamespaceInternal(String s) throws SailException {
+ protected void removeNamespaceInternal(final String s) throws SailException {
namespaceManager.removeNamespace(s);
}
@Override
- protected void removeStatementsInternal(Resource subject, URI predicate,
- Value object, Resource... contexts) throws SailException {
+ protected void removeStatementsInternal(final Resource subject, final URI predicate,
+ final Value object, final Resource... contexts) throws SailException {
if (!(subject instanceof URI)) {
throw new SailException("Subject[" + subject + "] must be URI");
}
try {
if (contexts != null && contexts.length > 0) {
- for (Resource context : contexts) {
+ for (final Resource context : contexts) {
if (!(context instanceof URI)) {
throw new SailException("Context[" + context + "] must be URI");
}
- RyaStatement statement = new RyaStatement(
+ final RyaStatement statement = new RyaStatement(
RdfToRyaConversions.convertResource(subject),
RdfToRyaConversions.convertURI(predicate),
RdfToRyaConversions.convertValue(object),
@@ -557,7 +560,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
ryaDAO.delete(statement, conf);
}
} else {
- RyaStatement statement = new RyaStatement(
+ final RyaStatement statement = new RyaStatement(
RdfToRyaConversions.convertResource(subject),
RdfToRyaConversions.convertURI(predicate),
RdfToRyaConversions.convertValue(object),
@@ -565,7 +568,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
ryaDAO.delete(statement, conf);
}
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
throw new SailException(e);
}
}
@@ -576,13 +579,13 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
}
@Override
- protected void setNamespaceInternal(String s, String s1)
+ protected void setNamespaceInternal(final String s, final String s1)
throws SailException {
namespaceManager.addNamespace(s, s1);
}
@Override
- protected long sizeInternal(Resource... contexts) throws SailException {
+ protected long sizeInternal(final Resource... contexts) throws SailException {
logger.error("Cannot determine size as of yet");
return 0;
@@ -595,32 +598,34 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
public static class StoreTripleSource implements TripleSource {
- private RdfCloudTripleStoreConfiguration conf;
- private RyaDAO<?> ryaDAO;
+ private final RdfCloudTripleStoreConfiguration conf;
+ private final RyaDAO<?> ryaDAO;
- public StoreTripleSource(RdfCloudTripleStoreConfiguration conf, RyaDAO<?> ryaDAO) {
+ public StoreTripleSource(final RdfCloudTripleStoreConfiguration conf, final RyaDAO<?> ryaDAO) {
this.conf = conf;
this.ryaDAO = ryaDAO;
}
- public CloseableIteration<Statement, QueryEvaluationException> getStatements(
- Resource subject, URI predicate, Value object,
- Resource... contexts) throws QueryEvaluationException {
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> getStatements(
+ final Resource subject, final URI predicate, final Value object,
+ final Resource... contexts) throws QueryEvaluationException {
return RyaDAOHelper.query(ryaDAO, subject, predicate, object, conf, contexts);
}
public CloseableIteration<? extends Entry<Statement, BindingSet>, QueryEvaluationException> getStatements(
- Collection<Map.Entry<Statement, BindingSet>> statements,
- Resource... contexts) throws QueryEvaluationException {
+ final Collection<Map.Entry<Statement, BindingSet>> statements,
+ final Resource... contexts) throws QueryEvaluationException {
return RyaDAOHelper.query(ryaDAO, statements, conf);
}
- public ValueFactory getValueFactory() {
+ @Override
+ public ValueFactory getValueFactory() {
return RdfCloudTripleStoreConstants.VALUE_FACTORY;
}
}
-
+
public InferenceEngine getInferenceEngine() {
return inferenceEngine;
}
[2/2] incubator-rya git commit: RYA-307 Improved Rya MongoDB ingest
of statements ... Closes #181
Posted by mi...@apache.org.
RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181
...through the Sail Layer and Rya DAO by queueing up multiple inserts at a time so can be written as a single batch. If no statements in the batch have been written after a set time limit then they are flushed out into the datastore. The size of the batch and the time limit are configurable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8def4cac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8def4cac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8def4cac
Branch: refs/heads/master
Commit: 8def4cacac7d0bb9ca3cc675c53d849261aa8029
Parents: fa2aad5
Author: eric.white <Er...@parsons.com>
Authored: Wed Jul 19 09:08:32 2017 -0400
Committer: Aaron Mihalik <aa...@gmail.com>
Committed: Thu Aug 3 15:45:05 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/rya/api/persist/RyaDAO.java | 7 +
.../org/apache/rya/accumulo/AccumuloRyaDAO.java | 161 ++++++-------
.../rya/mongodb/MongoDBRdfConfiguration.java | 24 ++
.../org/apache/rya/mongodb/MongoDBRyaDAO.java | 63 ++++-
.../rya/mongodb/batch/MongoDbBatchWriter.java | 238 +++++++++++++++++++
.../mongodb/batch/MongoDbBatchWriterConfig.java | 88 +++++++
.../batch/MongoDbBatchWriterException.java | 59 +++++
.../mongodb/batch/MongoDbBatchWriterUtils.java | 82 +++++++
.../batch/collection/CollectionType.java | 43 ++++
.../batch/collection/DbCollectionType.java | 53 +++++
.../batch/collection/MongoCollectionType.java | 52 ++++
.../indexing/mongodb/AbstractMongoIndexer.java | 60 ++++-
.../rya/indexing/mongo/MongoEntityIndexIT.java | 22 +-
.../indexing/mongo/MongoIndexerDeleteIT.java | 16 +-
.../RdfCloudTripleStoreConnection.java | 195 +++++++--------
15 files changed, 965 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
index 57aae1b..d83a5e9 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java
@@ -123,4 +123,11 @@ public interface RyaDAO<C extends RdfCloudTripleStoreConfiguration> extends RyaC
public void purge(RdfCloudTripleStoreConfiguration configuration);
public void dropAndDestroy() throws RyaDAOException;
+
+ /**
+ * Flushes any RyaStatements queued for insertion and writes them to the
+ * datastore.
+ * @throws RyaDAOException
+ */
+ public void flush() throws RyaDAOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
index bd7d2b3..f1f7c03 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -59,12 +59,6 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
-import org.openrdf.model.Namespace;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import info.aduna.iteration.CloseableIteration;
import org.apache.rya.accumulo.experimental.AccumuloIndexer;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
@@ -76,6 +70,12 @@ import org.apache.rya.api.persist.RyaDAO;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.RyaNamespaceManager;
import org.apache.rya.api.resolver.RyaTripleContext;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
@@ -131,13 +131,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
flushEachUpdate = conf.flushEachUpdate();
- TableOperations tableOperations = connector.tableOperations();
+ final TableOperations tableOperations = connector.tableOperations();
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs());
- for (AccumuloIndexer index : secondaryIndexers) {
+ for (final AccumuloIndexer index : secondaryIndexers) {
index.setConf(conf);
}
@@ -150,7 +150,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
- for (AccumuloIndexer index : secondaryIndexers) {
+ for (final AccumuloIndexer index : secondaryIndexers) {
index.setConnector(connector);
index.setMultiTableBatchWriter(mt_bw);
index.init();
@@ -161,7 +161,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
checkVersion();
initialized = true;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@@ -169,7 +169,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
@Override
public String getVersion() throws RyaDAOException {
String version = null;
- CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
+ final CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
if (versIter.hasNext()) {
version = versIter.next().getObject().getData();
}
@@ -179,43 +179,43 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
@Override
- public void add(RyaStatement statement) throws RyaDAOException {
+ public void add(final RyaStatement statement) throws RyaDAOException {
commit(Iterators.singletonIterator(statement));
}
@Override
- public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+ public void add(final Iterator<RyaStatement> iter) throws RyaDAOException {
commit(iter);
}
@Override
- public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
+ public void delete(final RyaStatement stmt, final AccumuloRdfConfiguration aconf) throws RyaDAOException {
this.delete(Iterators.singletonIterator(stmt), aconf);
}
@Override
- public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException {
+ public void delete(final Iterator<RyaStatement> statements, final AccumuloRdfConfiguration conf) throws RyaDAOException {
try {
while (statements.hasNext()) {
- RyaStatement stmt = statements.next();
+ final RyaStatement stmt = statements.next();
//query first
- CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
+ final CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
while (query.hasNext()) {
deleteSingleRyaStatement(query.next());
}
- for (AccumuloIndexer index : secondaryIndexers) {
+ for (final AccumuloIndexer index : secondaryIndexers) {
index.deleteStatement(stmt);
}
}
if (flushEachUpdate) { mt_bw.flush(); }
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
- public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException {
+ public void dropGraph(final AccumuloRdfConfiguration conf, final RyaURI... graphs) throws RyaDAOException {
BatchDeleter bd_spo = null;
BatchDeleter bd_po = null;
BatchDeleter bd_osp = null;
@@ -229,7 +229,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
bd_po.setRanges(Collections.singleton(new Range()));
bd_osp.setRanges(Collections.singleton(new Range()));
- for (RyaURI graph : graphs){
+ for (final RyaURI graph : graphs){
bd_spo.fetchColumnFamily(new Text(graph.getData()));
bd_po.fetchColumnFamily(new Text(graph.getData()));
bd_osp.fetchColumnFamily(new Text(graph.getData()));
@@ -244,7 +244,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
// index.dropGraph(graphs);
// }
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
} finally {
if (bd_spo != null) {
@@ -260,34 +260,34 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
- protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException {
- Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
+ protected void deleteSingleRyaStatement(final RyaStatement stmt) throws IOException, MutationsRejectedException {
+ final Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
}
- protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException {
+ protected void commit(final Iterator<RyaStatement> commitStatements) throws RyaDAOException {
try {
//TODO: Should have a lock here in case we are adding and committing at the same time
while (commitStatements.hasNext()) {
- RyaStatement stmt = commitStatements.next();
+ final RyaStatement stmt = commitStatements.next();
- Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
- Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
- Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
- Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+ final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
+ final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+ final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+ final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
bw_spo.addMutations(spo);
bw_po.addMutations(po);
bw_osp.addMutations(osp);
- for (AccumuloIndexer index : secondaryIndexers) {
+ for (final AccumuloIndexer index : secondaryIndexers) {
index.storeStatement(stmt);
}
}
if (flushEachUpdate) { mt_bw.flush(); }
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@@ -303,57 +303,57 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
mt_bw.flush();
mt_bw.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
- for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.destroy();
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.warn("Failed to destroy indexer", e);
}
}
}
@Override
- public void addNamespace(String pfx, String namespace) throws RyaDAOException {
+ public void addNamespace(final String pfx, final String namespace) throws RyaDAOException {
try {
- Mutation m = new Mutation(new Text(pfx));
+ final Mutation m = new Mutation(new Text(pfx));
m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
bw_ns.addMutation(m);
if (flushEachUpdate) { mt_bw.flush(); }
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
- public String getNamespace(String pfx) throws RyaDAOException {
+ public String getNamespace(final String pfx) throws RyaDAOException {
try {
- Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+ final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
ALL_AUTHORIZATIONS);
scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
scanner.setRange(new Range(new Text(pfx)));
- Iterator<Map.Entry<Key, Value>> iterator = scanner
+ final Iterator<Map.Entry<Key, Value>> iterator = scanner
.iterator();
if (iterator.hasNext()) {
return new String(iterator.next().getValue().get());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
return null;
}
@Override
- public void removeNamespace(String pfx) throws RyaDAOException {
+ public void removeNamespace(final String pfx) throws RyaDAOException {
try {
- Mutation del = new Mutation(new Text(pfx));
+ final Mutation del = new Mutation(new Text(pfx));
del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
bw_ns.addMutation(del);
if (flushEachUpdate) { mt_bw.flush(); }
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@@ -362,12 +362,12 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
@Override
public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException {
try {
- Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+ final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
ALL_AUTHORIZATIONS);
scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
- Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+ final Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
return new AccumuloNamespaceTableIterator(result);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@@ -378,21 +378,21 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
@Override
- public void purge(RdfCloudTripleStoreConfiguration configuration) {
- for (String tableName : getTables()) {
+ public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+ for (final String tableName : getTables()) {
try {
purge(tableName, configuration.getAuths());
compact(tableName);
- } catch (TableNotFoundException e) {
+ } catch (final TableNotFoundException e) {
logger.error(e.getMessage());
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
logger.error(e.getMessage());
}
}
- for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.purge(configuration);
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.error("Failed to purge indexer", e);
}
}
@@ -400,24 +400,24 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
@Override
public void dropAndDestroy() throws RyaDAOException {
- for (String tableName : getTables()) {
+ for (final String tableName : getTables()) {
try {
drop(tableName);
- } catch (AccumuloSecurityException e) {
+ } catch (final AccumuloSecurityException e) {
logger.error(e.getMessage());
throw new RyaDAOException(e);
- } catch (AccumuloException e) {
+ } catch (final AccumuloException e) {
logger.error(e.getMessage());
throw new RyaDAOException(e);
- } catch (TableNotFoundException e) {
+ } catch (final TableNotFoundException e) {
logger.warn(e.getMessage());
}
}
destroy();
- for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.dropAndDestroy();
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.error("Failed to drop and destroy indexer", e);
}
}
@@ -427,7 +427,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return connector;
}
- public void setConnector(Connector connector) {
+ public void setConnector(final Connector connector) {
this.connector = connector;
}
@@ -435,7 +435,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return batchWriterConfig;
}
- public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+ public void setBatchWriterConfig(final BatchWriterConfig batchWriterConfig) {
this.batchWriterConfig = batchWriterConfig;
}
@@ -449,7 +449,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
@Override
- public void setConf(AccumuloRdfConfiguration conf) {
+ public void setConf(final AccumuloRdfConfiguration conf) {
this.conf = conf;
}
@@ -457,7 +457,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return ryaTableMutationsFactory;
}
- public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) {
+ public void setRyaTableMutationsFactory(final RyaTableMutationsFactory ryaTableMutationsFactory) {
this.ryaTableMutationsFactory = ryaTableMutationsFactory;
}
@@ -466,21 +466,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return queryEngine;
}
- public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+ public void setQueryEngine(final AccumuloRyaQueryEngine queryEngine) {
this.queryEngine = queryEngine;
}
+ @Override
public void flush() throws RyaDAOException {
try {
mt_bw.flush();
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
throw new RyaDAOException(e);
}
}
protected String[] getTables() {
// core tables
- List<String> tableNames = Lists.newArrayList(
+ final List<String> tableNames = Lists.newArrayList(
tableLayoutStrategy.getSpo(),
tableLayoutStrategy.getPo(),
tableLayoutStrategy.getOsp(),
@@ -488,17 +489,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
tableLayoutStrategy.getEval());
// Additional Tables
- for (AccumuloIndexer index : secondaryIndexers) {
+ for (final AccumuloIndexer index : secondaryIndexers) {
tableNames.add(index.getTableName());
}
return tableNames.toArray(new String[]{});
}
- private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException {
+ private void purge(final String tableName, final String[] auths) throws TableNotFoundException, MutationsRejectedException {
if (tableExists(tableName)) {
logger.info("Purging accumulo table: " + tableName);
- BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
+ final BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
try {
batchDeleter.setRanges(Collections.singleton(new Range()));
batchDeleter.delete();
@@ -508,31 +509,31 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
}
- private void compact(String tableName) {
+ private void compact(final String tableName) {
logger.info("Requesting major compaction for table " + tableName);
try {
connector.tableOperations().compact(tableName, null, null, true, false);
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error(e.getMessage());
}
}
- private boolean tableExists(String tableName) {
+ private boolean tableExists(final String tableName) {
return getConnector().tableOperations().exists(tableName);
}
- private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+ private BatchDeleter createBatchDeleter(final String tableName, final Authorizations authorizations) throws TableNotFoundException {
return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
}
private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
- String version = getVersion();
+ final String version = getVersion();
if (version == null) {
//adding to core Rya tables but not Indexes
- Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
- Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
- Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
- Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+ final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+ final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+ final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+ final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
bw_spo.addMutations(spo);
bw_po.addMutations(po);
bw_osp.addMutations(osp);
@@ -544,7 +545,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA);
}
- private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ private void drop(final String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
logger.info("Dropping cloudbase table: " + tableName);
connector.tableOperations().delete(tableName);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index 067b682..418a155 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -40,6 +40,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword";
public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
public static final String USE_MOCK_MONGO = ".useMockInstance";
+ public static final String CONF_FLUSH_EACH_UPDATE = "rya.mongodb.dao.flusheachupdate";
+
private MongoClient mongoClient;
public MongoDBRdfConfiguration() {
@@ -99,6 +101,28 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
}
/**
+ * @return {@code true} if each statement added to the batch writer should
+ * be flushed and written right away to the datastore. {@code false} if the
+ * statements should be queued and written to the datastore when the queue
+ * is full or after enough time has passed without a write.<p>
+ * Defaults to {@code true} if nothing is specified.
+ */
+ public boolean flushEachUpdate(){
+ return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+ }
+
+ /**
+ * Sets the {@link #CONF_FLUSH_EACH_UPDATE} property of the configuration.
+ * @param flush {@code true} if each statement added to the batch writer
+ * should be flushed and written right away to the datastore. {@code false}
+ * if the statements should be queued and written to the datastore when the
+ * queue is full or after enough time has passed without a write.
+ */
+ public void setFlush(final boolean flush){
+ setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+ }
+
+ /**
* @return name of Mongo Collection containing Rya triples
*/
public String getTriplesCollectionName() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
index daa8a67..a32651d 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -37,6 +37,11 @@ import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.RyaNamespaceManager;
import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
import org.apache.rya.api.persist.query.RyaQueryEngine;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
import org.apache.rya.mongodb.dao.MongoDBNamespaceManager;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
@@ -47,7 +52,6 @@ import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.DuplicateKeyException;
-import com.mongodb.InsertOptions;
import com.mongodb.MongoClient;
/**
@@ -56,6 +60,8 @@ import com.mongodb.MongoClient;
public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class);
+ private boolean isInitialized = false;
+ private boolean flushEachUpdate = true;
private MongoDBRdfConfiguration conf;
private final MongoClient mongoClient;
private DB db;
@@ -67,6 +73,8 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
private List<MongoSecondaryIndex> secondaryIndexers;
private Authorizations auths;
+ private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
/**
* Creates a new instance of {@link MongoDBRyaDAO}.
* @param conf the {@link MongoDBRdfConfiguration}.
@@ -87,6 +95,7 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
this.mongoClient = mongoClient;
conf.setMongoClient(mongoClient);
auths = conf.getAuthorizations();
+ flushEachUpdate = conf.flushEachUpdate();
init();
}
@@ -116,6 +125,9 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
@Override
public void init() throws RyaDAOException {
+ if (isInitialized) {
+ return;
+ }
secondaryIndexers = conf.getAdditionalIndexers();
for(final MongoSecondaryIndex index: secondaryIndexers) {
index.setConf(conf);
@@ -131,15 +143,34 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
for(final MongoSecondaryIndex index: secondaryIndexers) {
index.init();
}
+
+ final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+ mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(coll), mongoDbBatchWriterConfig);
+ try {
+ mongoDbBatchWriter.start();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new RyaDAOException("Error starting MongoDB batch writer", e);
+ }
+ isInitialized = true;
}
@Override
public boolean isInitialized() throws RyaDAOException {
- return true;
+ return isInitialized;
}
@Override
public void destroy() throws RyaDAOException {
+ if (!isInitialized) {
+ return;
+ }
+ isInitialized = false;
+ flush();
+ try {
+ mongoDbBatchWriter.shutdown();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
+ }
if (mongoClient != null) {
mongoClient.close();
}
@@ -153,7 +184,15 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
try {
final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility());
if (canAdd) {
- coll.insert(storageStrategy.serialize(statement));
+ final DBObject obj = storageStrategy.serialize(statement);
+ try {
+ mongoDbBatchWriter.addObjectToQueue(obj);
+ if (flushEachUpdate) {
+ flush();
+ }
+ } catch (final MongoDbBatchWriterException e) {
+ throw new RyaDAOException("Error adding statement", e);
+ }
for(final RyaSecondaryIndexer index: secondaryIndexers) {
index.storeStatement(statement);
}
@@ -190,7 +229,14 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
throw new RyaDAOException("User does not have the required authorizations to add statement");
}
}
- coll.insert(dbInserts, new InsertOptions().continueOnError(true));
+ try {
+ mongoDbBatchWriter.addObjectsToQueue(dbInserts);
+ if (flushEachUpdate) {
+ flush();
+ }
+ } catch (final MongoDbBatchWriterException e) {
+ throw new RyaDAOException("Error adding statements", e);
+ }
}
@Override
@@ -263,4 +309,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
public void dropAndDestroy() throws RyaDAOException {
db.dropDatabase(); // this is dangerous!
}
+
+ @Override
+ public void flush() throws RyaDAOException {
+ try {
+ mongoDbBatchWriter.flush();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new RyaDAOException("Error flushing data.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
new file mode 100644
index 0000000..2f52b5c
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.mongodb.batch.collection.CollectionType;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Handles batch writing MongoDB statement objects to the repository. It takes
+ * in a configurable batch size and flush time. If the number of objects placed
+ * in the queue reaches the batch size then the objects are bulk written to the
+ * datastore. Or if the queue has not filled up after the batch time duration
+ * has passed then the statements are flushed out and written to the datastore.
+ * @param <T> the type of object that the batch writer's internal collection
+ * type uses.
+ */
+public class MongoDbBatchWriter<T> {
+ private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class);
+
+ private static final int CHECK_QUEUE_INTERVAL_MS = 10;
+
+ private final CollectionType<T> collectionType;
+ private final long batchFlushTimeMs;
+
+ private final ArrayBlockingQueue<T> statementInsertionQueue;
+ private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(0);
+ private ScheduledFuture<?> flushBatchFuture;
+ private final Runnable flushBatchTask;
+ private Thread queueFullCheckerThread;
+
+ private final AtomicBoolean isInit = new AtomicBoolean();
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriter}.
+ * @param collectionType the {@link CollectionType}. (not {@code null})
+ * @param mongoDbBatchWriterConfig the {@link MongoDbBatchWriterConfig}.
+ * (not {@code null})
+ */
+ public MongoDbBatchWriter(final CollectionType<T> collectionType, final MongoDbBatchWriterConfig mongoDbBatchWriterConfig) {
+ this.collectionType = checkNotNull(collectionType);
+ this.batchFlushTimeMs = checkNotNull(mongoDbBatchWriterConfig).getBatchFlushTimeMs();
+
+ statementInsertionQueue = new ArrayBlockingQueue<>(mongoDbBatchWriterConfig.getBatchSize());
+ flushBatchTask = new BatchFlusher();
+ }
+
+ /**
+ * Task used to flush statements if enough time has passed without an
+ * insertion while there are objects enqueued.
+ */
+ private class BatchFlusher implements Runnable {
+ @Override
+ public void run() {
+ try {
+ if (!statementInsertionQueue.isEmpty()) {
+ log.trace("Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed.");
+ flush();
+ }
+ } catch (final Exception e) {
+ log.error("Error flush out the statements", e);
+ }
+ }
+ }
+
+ private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("Queue Full Checker Thread - %d")
+ .setDaemon(true)
+ .build();
+
+ /**
+ * Checks the queue for statements to insert if the queue is full.
+ */
+ private class QueueFullChecker implements Runnable {
+ @Override
+ public void run() {
+ try {
+ while (isInit.get()) {
+ // Check if the queue is full and if it is then insert the
+ // statements. Otherwise reset the insertion timer.
+ if (statementInsertionQueue.remainingCapacity() == 0) {
+ log.trace("Statement queue is FULL -> going to empty it");
+ try {
+ flush();
+ } catch (final MongoDbBatchWriterException e) {
+ log.error("Error emptying queue", e);
+ }
+ }
+ Thread.sleep(CHECK_QUEUE_INTERVAL_MS);
+ }
+ } catch (final InterruptedException e) {
+ log.error("Encountered an unexpected error while checking the batch queue.", e);
+ }
+ }
+ }
+
+ /**
+ * Starts the batch writer queue and processes.
+ */
+ public void start() throws MongoDbBatchWriterException {
+ if (!isInit.get()) {
+ if (flushBatchFuture == null) {
+ flushBatchFuture = startFlushTimer();
+ }
+ if (queueFullCheckerThread == null) {
+ queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker());
+ }
+ isInit.set(true);
+ queueFullCheckerThread.start();
+ }
+ }
+
+ /**
+ * Stops the batch writer processes.
+ */
+ public void shutdown() throws MongoDbBatchWriterException {
+ isInit.set(false);
+ if (flushBatchFuture != null) {
+ flushBatchFuture.cancel(true);
+ flushBatchFuture = null;
+ }
+ if (queueFullCheckerThread != null) {
+ if (queueFullCheckerThread.isAlive()) {
+ try {
+ queueFullCheckerThread.join(2 * CHECK_QUEUE_INTERVAL_MS);
+ } catch (final InterruptedException e) {
+ log.error("Error waiting for thread to finish", e);
+ }
+ queueFullCheckerThread = null;
+ }
+ }
+ }
+
+ /**
+ * Adds a MongoDB object to the queue which will not be written until one of
+ * the following occur:<br>
+ * <ul>
+ * <li>The queue fills up</li>
+ * <li>The flush time has been reached</li>
+ * <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+ * has been made</li>
+ * </ul>
+ * @param object the object to add to the queue.
+ * @throws IOException
+ */
+ public void addObjectToQueue(final T object) throws MongoDbBatchWriterException {
+ if (object != null) {
+ try {
+ // Place in the queue which will bulk write after the specified
+ // "batchSize" number of items have filled the queue or if more
+ // than "batchFlushTimeMs" milliseconds have passed since the
+ // last insertion.
+ resetFlushTimer();
+ statementInsertionQueue.put(object);
+ } catch (final Exception e) {
+ throw new MongoDbBatchWriterException("Error adding object to batch queue.", e);
+ }
+ }
+ }
+
+ /**
+ * Adds a list of MongoDB objects to the queue which will not be written
+ * until one of the following occur:<br>
+ * <ul>
+ * <li>The queue fills up</li>
+ * <li>The flush time has been reached</li>
+ * <li>A direct call to the {@link MongoDbBatchWriter#flush()} method
+ * has been made</li>
+ * </ul>
+ * @param objects a {@link List} of objects to add to the queue.
+ * @throws IOException
+ */
+ public void addObjectsToQueue(final List<T> objects) throws MongoDbBatchWriterException {
+ if (objects != null) {
+ for (final T object : objects) {
+ addObjectToQueue(object);
+ }
+ }
+ }
+
+ /**
+ * Flushes out statements that are in the queue.
+ */
+ public void flush() throws MongoDbBatchWriterException {
+ final List<T> batch = new ArrayList<>();
+ try {
+ statementInsertionQueue.drainTo(batch);
+ if (!batch.isEmpty()) {
+ collectionType.insertMany(batch);
+ }
+ } catch (final Exception e) {
+ throw new MongoDbBatchWriterException("Error flushing statements", e);
+ }
+ }
+
+ private void resetFlushTimer() throws MongoDbBatchWriterException {
+ flushBatchFuture.cancel(false);
+ flushBatchFuture = startFlushTimer();
+ }
+
+ private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException {
+ try {
+ return scheduledExecutor.schedule(flushBatchTask, batchFlushTimeMs, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ throw new MongoDbBatchWriterException("Error starting batch flusher", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
new file mode 100644
index 0000000..cec8b9a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configuration for the MongoDB Batch Writer.
+ */
+public class MongoDbBatchWriterConfig {
+ /**
+ * The default number of statements to batch write at a time.
+ */
+ public static final int DEFAULT_BATCH_SIZE = 50000;
+ private Integer batchSize = null;
+
+ /**
+ * The default time to wait in milliseconds to flush all statements out that
+ * are queued for insertion if the queue has not filled up to its capacity
+ * of {@link #DEFAULT_BATCH_SIZE} or the user configured buffer size.
+ */
+ public static final long DEFAULT_BATCH_FLUSH_TIME_MS = 100L;
+ private Long batchFlushTimeMs = null;
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriterConfig}.
+ */
+ public MongoDbBatchWriterConfig() {
+ }
+
+ /**
+ * Gets the configured number of statements to batch write at a time.
+ * @return the configured value or the default value.
+ */
+ public int getBatchSize() {
+ return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE;
+ }
+
+ /**
+ * Sets the number of statements to batch write at a time.
+ * @param batchSize the number of statements in each batch.
+ * @return the {@link MongoDbBatchWriterConfig}.
+ */
+ public MongoDbBatchWriterConfig setBatchSize(final int batchSize) {
+ Preconditions.checkArgument(batchSize > 0, "Batch size must be positive.");
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Gets the configured time to wait in milliseconds to flush all statements
+ * out that are queued for insertion if the queue has not filled up to its
+ * capacity.
+ * @return the configured value or the default value.
+ */
+ public long getBatchFlushTimeMs() {
+ return batchFlushTimeMs != null ? batchFlushTimeMs : DEFAULT_BATCH_FLUSH_TIME_MS;
+ }
+
+ /**
+ * Sets the time to wait in milliseconds to flush all statements out that
+ * are queued for insertion if the queue has not filled up to its capacity.
+ * @param batchFlushTimeMs the time to wait before flushing all queued
+ * statements that have not been written.
+ * @return the {@link MongoDbBatchWriterConfig}.
+ */
+ public MongoDbBatchWriterConfig setBatchFlushTimeMs(final long batchFlushTimeMs) {
+ Preconditions.checkArgument(batchFlushTimeMs >= 0, "Batch flush time must be non-negative.");
+ this.batchFlushTimeMs = batchFlushTimeMs;
+ return this;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
new file mode 100644
index 0000000..d4de156
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+/**
+ * An exception to be used when there is a problem running the MongoDB Batch
+ * Writer.
+ */
+public class MongoDbBatchWriterException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriterException}.
+ */
+ public MongoDbBatchWriterException() {
+ super();
+ }
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriterException}.
+ * @param message the detail message.
+ */
+ public MongoDbBatchWriterException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriterException}.
+ * @param message the detail message.
+ * @param throwable the {@link Throwable} source.
+ */
+ public MongoDbBatchWriterException(final String message, final Throwable source) {
+ super(message, source);
+ }
+
+ /**
+ * Creates a new instance of {@link MongoDbBatchWriterException}.
+ * @param source the {@link Throwable} source.
+ */
+ public MongoDbBatchWriterException(final Throwable source) {
+ super(source);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
new file mode 100644
index 0000000..99e8992
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Constants and utility methods related to batch writing statements in a MongoDB
+ * Rya repository.
+ */
+public final class MongoDbBatchWriterUtils {
+ /**
+ * Config tag used to specify the number of statements to batch write at a
+ * time.
+ */
+ public static final String BATCH_SIZE_TAG = "rya.mongodb.dao.batchwriter.size";
+
+ /**
+ * Config tag used to specify the time to wait in milliseconds to flush all
+ * statements out that are queued for insertion if the queue has not filled
+ * up to its capacity.
+ */
+ public static final String BATCH_FLUSH_TIME_MS_TAG = "rya.mongodb.dao.batchwriter.flushtime";
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private MongoDbBatchWriterUtils() {
+ }
+
+ /**
+ * The number of statements to batch write at a time.
+ * @param conf the {@link Configuration} to check.
+ * @return the configured value or the default value.
+ */
+ public static int getConfigBatchSize(final Configuration conf) {
+ return conf.getInt(BATCH_SIZE_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_SIZE);
+ }
+
+ /**
+ * The time to wait in milliseconds to flush all statements out that are
+ * queued for insertion if the queue has not filled up to its capacity.
+ * @param conf the {@link Configuration} to check.
+ * @return the configured value or the default value.
+ */
+ public static long getConfigBatchFlushTimeMs(final Configuration conf) {
+ return conf.getLong(BATCH_FLUSH_TIME_MS_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_FLUSH_TIME_MS);
+ }
+
+ /**
+ * Reads the specified configed to create and initialize a
+ * {@link MongoDbBatchWriterConfig}. If no values are found then the default
+ * values are used.
+ * @param conf the {@link Configuration} to check.
+ * @return the {@link MongoDbBatchWriterConfig} populated with configured
+ * values for the specified {@code conf}.
+ */
+ public static MongoDbBatchWriterConfig getMongoDbBatchWriterConfig(final Configuration conf) {
+ final int batchSize = getConfigBatchSize(conf);
+ final long batchFlushTimeMs = getConfigBatchFlushTimeMs(conf);
+ final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = new MongoDbBatchWriterConfig();
+ mongoDbBatchWriterConfig.setBatchSize(batchSize);
+ mongoDbBatchWriterConfig.setBatchFlushTimeMs(batchFlushTimeMs);
+ return mongoDbBatchWriterConfig;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
new file mode 100644
index 0000000..9e6d6fb
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import java.util.List;
+
+/**
+ * Wrapper for interacting with the new and legacy MongoDB collection types
+ * ({@link com.mongodb.client.MongoCollection} and
+ * {@link com.mongodb.DBCollection} respectively)
+ * in order to handle inserts from both types and the object types they
+ * utilize.
+ * @param <T> the type of object the collection type inserts.
+ */
+public interface CollectionType<T> {
+ /**
+ * Insert one item.
+ * @param item the item to insert.
+ */
+ public void insertOne(final T item);
+
+ /**
+ * Insert a list of items.
+ * @param items the {@link List} of items.
+ */
+ public void insertMany(final List<T> items);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
new file mode 100644
index 0000000..ea00693
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.InsertOptions;
+import com.mongodb.WriteConcern;
+
+/**
+ * Provides access to the {@link DBCollection} type.
+ */
+public class DbCollectionType implements CollectionType<DBObject> {
+ private final DBCollection collection;
+
+ /**
+ * Creates a new instance of {@link DbCollectionType}.
+ * @param collection the {@link DBCollection}. (not {@code null})
+ */
+ public DbCollectionType(final DBCollection collection) {
+ this.collection = checkNotNull(collection);
+ }
+
+ @Override
+ public void insertOne(final DBObject item) {
+ collection.insert(item, WriteConcern.ACKNOWLEDGED);
+ }
+
+ @Override
+ public void insertMany(final List<DBObject> items) {
+ collection.insert(items, new InsertOptions().continueOnError(true));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
new file mode 100644
index 0000000..8fb796a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.batch.collection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+
+import org.bson.Document;
+
+import com.mongodb.client.MongoCollection;
+
+/**
+ * Provides access to the {@link MongoCollection} type.
+ */
+public class MongoCollectionType implements CollectionType<Document> {
+ private final MongoCollection<Document> collection;
+
+ /**
+ * Creates a new instance of {@link MongoCollectionType}.
+ * @param collection the {@link MongoCollection}. (not {@code null})
+ */
+ public MongoCollectionType(final MongoCollection<Document> collection) {
+ this.collection = checkNotNull(collection);
+ }
+
+ @Override
+ public void insertOne(final Document item) {
+ collection.insertOne(item);
+ }
+
+ @Override
+ public void insertMany(final List<Document> items) {
+ collection.insertMany(items);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index f5372d1..69ca274 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -33,6 +33,11 @@ import org.apache.rya.mongodb.MongoConnectorFactory;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
@@ -46,7 +51,6 @@ import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.QueryBuilder;
import com.mongodb.ServerAddress;
-import com.mongodb.WriteConcern;
import info.aduna.iteration.CloseableIteration;
@@ -58,6 +62,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
private boolean isInit = false;
+ private boolean flushEachUpdate = true;
protected Configuration conf;
protected MongoDBRyaDAO dao;
protected MongoClient mongoClient;
@@ -68,15 +73,28 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
protected T storageStrategy;
+ private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;
+
protected void initCore() {
dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME);
db = this.mongoClient.getDB(dbName);
- collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
+ final String collectionName = conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName();
+ collection = db.getCollection(collectionName);
+
+ flushEachUpdate = ((MongoDBRdfConfiguration)conf).flushEachUpdate();
+
+ final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+ mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(collection), mongoDbBatchWriterConfig);
+ try {
+ mongoDbBatchWriter.start();
+ } catch (final MongoDbBatchWriterException e) {
+ LOG.error("Error start MongoDB batch writer", e);
+ }
}
@Override
public void setClient(final MongoClient client){
- this.mongoClient = client;
+ this.mongoClient = client;
}
@VisibleForTesting
@@ -96,8 +114,8 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
public void setConf(final Configuration conf) {
this.conf = conf;
if (!isInit){
- setClient(MongoConnectorFactory.getMongoClient(conf));
- init();
+ setClient(MongoConnectorFactory.getMongoClient(conf));
+ init();
}
}
@@ -108,6 +126,11 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
@Override
public void flush() throws IOException {
+ try {
+ mongoDbBatchWriter.flush();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new IOException("Error flushing batch writer", e);
+ }
}
@Override
@@ -135,24 +158,43 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
public void storeStatements(final Collection<RyaStatement> ryaStatements)
throws IOException {
for (final RyaStatement ryaStatement : ryaStatements){
- storeStatement(ryaStatement);
+ storeStatement(ryaStatement, false);
+ }
+ if (flushEachUpdate) {
+ flush();
}
}
@Override
public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+ storeStatement(ryaStatement, flushEachUpdate);
+ }
+
+ private void storeStatement(final RyaStatement ryaStatement, final boolean flush) throws IOException {
+ final DBObject obj = prepareStatementForStorage(ryaStatement);
+ try {
+ mongoDbBatchWriter.addObjectToQueue(obj);
+ if (flush) {
+ flush();
+ }
+ } catch (final MongoDbBatchWriterException e) {
+ throw new IOException("Error storing statement", e);
+ }
+ }
+
+ private DBObject prepareStatementForStorage(final RyaStatement ryaStatement) {
try {
final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate());
if (isValidPredicate && (statement.getObject() instanceof Literal)) {
final DBObject obj = storageStrategy.serialize(ryaStatement);
- if (obj != null) {
- collection.insert(obj, WriteConcern.ACKNOWLEDGED);
- }
+ return obj;
}
} catch (final IllegalArgumentException e) {
LOG.error("Unable to parse the statement: " + ryaStatement.toString(), e);
}
+
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
index 4ddb2a5..6fac386 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java
@@ -21,9 +21,7 @@ package org.apache.rya.indexing.mongo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -36,12 +34,13 @@ import org.apache.rya.indexing.entity.storage.EntityStorage;
import org.apache.rya.indexing.entity.storage.TypeStorage;
import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.sail.config.RyaSailFactory;
import org.bson.Document;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
@@ -70,7 +69,7 @@ public class MongoEntityIndexIT {
private MongoClient mongoClient;
@Before
- public void setUp() throws Exception{
+ public void setUp() throws Exception {
mongoClient = MockMongoFactory.with(Version.Main.PRODUCTION).newMongoClient();
conf = new MongoDBRdfConfiguration();
conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test");
@@ -91,6 +90,19 @@ public class MongoEntityIndexIT {
indexer.init();
}
+ @After
+ public void tearDown() throws Exception {
+ if (mongoClient != null) {
+ MongoConnectorFactory.closeMongoClient();
+ }
+ if (conn != null) {
+ conn.clear();
+ }
+ if (indexer != null) {
+ indexer.close();
+ }
+ }
+
@Test
public void ensureInEntityStore_Test() throws Exception {
setupTypes();
@@ -202,8 +214,6 @@ public class MongoEntityIndexIT {
}
private void addStatements() throws Exception {
- final List<Statement> stmnts = new ArrayList<>();
-
//alice
URI subject = VF.createURI("urn:alice");
URI predicate = VF.createURI("urn:name");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
index b533d42..4b66b5b 100644
--- a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java
@@ -31,6 +31,8 @@ import org.apache.rya.indexing.TemporalInstantRfc3339;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.openrdf.model.Resource;
@@ -81,6 +83,16 @@ public class MongoIndexerDeleteIT {
conn.begin();
}
+ @After
+ public void after() throws Exception {
+ if (client != null) {
+ MongoConnectorFactory.closeMongoClient();
+ }
+ if (conn != null) {
+ conn.clear();
+ }
+ }
+
@Test
public void deleteTest() throws Exception {
populateRya();
@@ -150,14 +162,10 @@ public class MongoIndexerDeleteIT {
uuid = "urn:people";
conn.add(VF.createURI(uuid), RDF.TYPE, person);
conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Alice Palace Hose", VF.createURI("http://www.w3.org/2001/XMLSchema#string")));
-
- uuid = "urn:people";
- conn.add(VF.createURI(uuid), RDF.TYPE, person);
conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Bob Snob Hose", "en"));
// temporal
final TemporalInstant instant = new TemporalInstantRfc3339(1, 2, 3, 4, 5, 6);
- final URI time = VF.createURI("Property:atTime");
conn.add(VF.createURI("foo:time"), VF.createURI("Property:atTime"), VF.createLiteral(instant.toString()));
}