You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/19 01:43:52 UTC
incubator-rya git commit: RYA-337 Adding batch queries to MongoDB.
Closes #204
Repository: incubator-rya
Updated Branches:
refs/heads/master ad60aca8d -> 0d80871ff
RYA-337 Adding batch queries to MongoDB. Closes #204
Additionally, simplifying MongoDBQueryEngine
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/0d80871f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/0d80871f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/0d80871f
Branch: refs/heads/master
Commit: 0d80871ff614683fb0b4105a7778b5e56e53051c
Parents: ad60aca
Author: Aaron Mihalik <mi...@alum.mit.edu>
Authored: Tue Aug 8 11:17:37 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 18 17:54:17 2017 -0700
----------------------------------------------------------------------
.../apache/rya/mongodb/MongoDBQueryEngine.java | 120 ++++++-------------
.../NonCloseableRyaStatementCursorIterator.java | 57 ---------
.../RyaStatementBindingSetCursorIterator.java | 115 ++++++++++++------
.../iter/RyaStatementCursorIterable.java | 67 -----------
.../iter/RyaStatementCursorIterator.java | 94 ++++-----------
.../rya/mongodb/MongoDBQueryEngineTest.java | 30 +++++
6 files changed, 165 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
index 8932fc4..f1115b1 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
@@ -21,11 +21,13 @@ package org.apache.rya.mongodb;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
+import java.util.AbstractMap;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
@@ -34,19 +36,17 @@ import org.apache.rya.api.persist.query.RyaQuery;
import org.apache.rya.api.persist.query.RyaQueryEngine;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
-import org.apache.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator;
import org.apache.rya.mongodb.iter.RyaStatementBindingSetCursorIterator;
-import org.apache.rya.mongodb.iter.RyaStatementCursorIterable;
import org.apache.rya.mongodb.iter.RyaStatementCursorIterator;
import org.bson.Document;
import org.calrissian.mango.collect.CloseableIterable;
+import org.calrissian.mango.collect.CloseableIterables;
import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -61,13 +61,10 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio
private MongoDBRdfConfiguration configuration;
private final MongoClient mongoClient;
- private final DBCollection coll;
private final MongoDBStorageStrategy<RyaStatement> strategy;
public MongoDBQueryEngine(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) {
this.mongoClient = checkNotNull(mongoClient);
- final DB db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
- coll = db.getCollection(conf.getTriplesCollectionName());
strategy = new SimpleMongoDBStorageStrategy();
}
@@ -86,50 +83,36 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio
public CloseableIteration<RyaStatement, RyaDAOException> query(
final RyaStatement stmt, MongoDBRdfConfiguration conf)
throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
- final Long maxResults = conf.getLimit();
- final Set<DBObject> queries = new HashSet<DBObject>();
- final DBObject query = strategy.getQuery(stmt);
- queries.add(query);
- final MongoDatabase db = mongoClient.getDatabase(conf.getMongoDBName());
- final MongoCollection<Document> collection = db.getCollection(conf.getTriplesCollectionName());
- final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(collection, queries, strategy,
- conf.getAuthorizations());
-
- if (maxResults != null) {
- iterator.setMaxResults(maxResults);
- }
- return iterator;
+ Preconditions.checkNotNull(stmt);
+ Preconditions.checkNotNull(conf);
+
+ Entry<RyaStatement, BindingSet> entry = new AbstractMap.SimpleEntry<>(stmt, new MapBindingSet());
+ Collection<Entry<RyaStatement, BindingSet>> collection = Collections.singleton(entry);
+
+ return new RyaStatementCursorIterator(queryWithBindingSet(collection, conf));
}
@Override
public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(
final Collection<Entry<RyaStatement, BindingSet>> stmts,
MongoDBRdfConfiguration conf) throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
- final Long maxResults = conf.getLimit();
- final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create();
+ Preconditions.checkNotNull(stmts);
+ Preconditions.checkNotNull(conf);
+
+ final Multimap<RyaStatement, BindingSet> rangeMap = HashMultimap.create();
//TODO: cannot span multiple tables here
try {
for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
final RyaStatement stmt = stmtbs.getKey();
final BindingSet bs = stmtbs.getValue();
- final DBObject query = strategy.getQuery(stmt);
- rangeMap.put(query, bs);
+ rangeMap.put(stmt, bs);
}
// TODO not sure what to do about regex ranges?
final RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator(
getCollection(conf), rangeMap, strategy, conf.getAuthorizations());
- if (maxResults != null) {
- iterator.setMaxResults(maxResults);
- }
return iterator;
} catch (final Exception e) {
throw new RyaDAOException(e);
@@ -140,72 +123,39 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio
public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(
final Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf)
throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
- final Long maxResults = conf.getLimit();
- final Set<DBObject> queries = new HashSet<DBObject>();
+ final Map<RyaStatement, BindingSet> queries = new HashMap<>();
- try {
- for (final RyaStatement stmt : stmts) {
- queries.add( strategy.getQuery(stmt));
- }
-
- // TODO not sure what to do about regex ranges?
- final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(getCollection(conf), queries,
- strategy, configuration.getAuthorizations());
-
- if (maxResults != null) {
- iterator.setMaxResults(maxResults);
- }
- return iterator;
- } catch (final Exception e) {
- throw new RyaDAOException(e);
+ for (final RyaStatement stmt : stmts) {
+ queries.put(stmt, new MapBindingSet());
}
+ return new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), conf));
}
+
@Override
public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery)
throws RyaDAOException {
- final Set<DBObject> queries = new HashSet<DBObject>();
-
- try {
- queries.add( strategy.getQuery(ryaQuery));
-
- // TODO not sure what to do about regex ranges?
- // TODO this is gross
- final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(
- new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(getCollection(getConf()),
- queries, strategy, configuration.getAuthorizations())));
+ Preconditions.checkNotNull(ryaQuery);
- return iterator;
- } catch (final Exception e) {
- throw new RyaDAOException(e);
- }
+ return query(new BatchRyaQuery(Collections.singleton(ryaQuery.getQuery())));
}
+
@Override
public CloseableIterable<RyaStatement> query(final BatchRyaQuery batchRyaQuery)
throws RyaDAOException {
- try {
- final Set<DBObject> queries = new HashSet<DBObject>();
- for (final RyaStatement statement : batchRyaQuery.getQueries()){
- queries.add( strategy.getQuery(statement));
+ Preconditions.checkNotNull(batchRyaQuery);
- }
+ final Map<RyaStatement, BindingSet> queries = new HashMap<>();
- // TODO not sure what to do about regex ranges?
- // TODO this is gross
- final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(
- new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(getCollection(getConf()),
- queries, strategy, configuration.getAuthorizations())));
-
- return iterator;
- } catch (final Exception e) {
- throw new RyaDAOException(e);
+ for (final RyaStatement stmt : batchRyaQuery.getQueries()) {
+ queries.put(stmt, new MapBindingSet());
}
+
+ Iterator<RyaStatement> iterator = new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), getConf()));
+ return CloseableIterables.wrap((Iterable<RyaStatement>) () -> iterator);
}
- private MongoCollection getCollection(final MongoDBRdfConfiguration conf) {
+ private MongoCollection<Document> getCollection(final MongoDBRdfConfiguration conf) {
final MongoDatabase db = mongoClient.getDatabase(conf.getMongoDBName());
return db.getCollection(conf.getTriplesCollectionName());
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
deleted file mode 100644
index 35dab6d..0000000
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Iterator;
-
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.persist.RyaDAOException;
-
-public class NonCloseableRyaStatementCursorIterator implements Iterator<RyaStatement> {
-
- RyaStatementCursorIterator iterator;
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public RyaStatement next() {
- return iterator.next();
- }
-
- public NonCloseableRyaStatementCursorIterator(
- RyaStatementCursorIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void remove() {
- try {
- iterator.remove();
- } catch (RyaDAOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
index 18f71d2..de5e8b0 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
@@ -19,21 +19,25 @@
package org.apache.rya.mongodb.iter;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.log4j.Logger;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
import org.bson.Document;
import org.openrdf.query.BindingSet;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
import com.mongodb.DBObject;
import com.mongodb.client.AggregateIterable;
@@ -44,20 +48,21 @@ import info.aduna.iteration.CloseableIteration;
public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class);
+
+ private static final int QUERY_BATCH_SIZE = 50;
private final MongoCollection<Document> coll;
- private final Multimap<DBObject, BindingSet> rangeMap;
- private final Iterator<DBObject> queryIterator;
- private Long maxResults;
- private Iterator<Document> resultsIterator;
- private RyaStatement currentStatement;
- private Collection<BindingSet> currentBindingSetCollection;
+ private final Multimap<RyaStatement, BindingSet> rangeMap;
+ private final Multimap<RyaStatement, BindingSet> executedRangeMap = HashMultimap.create();
+ private final Iterator<RyaStatement> queryIterator;
+ private Iterator<Document> batchQueryResultsIterator;
+ private RyaStatement currentResultStatement;
private Iterator<BindingSet> currentBindingSetIterator;
private final MongoDBStorageStrategy<RyaStatement> strategy;
private final Authorizations auths;
public RyaStatementBindingSetCursorIterator(final MongoCollection<Document> coll,
- final Multimap<DBObject, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy,
+ final Multimap<RyaStatement, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy,
final Authorizations auths) {
this.coll = coll;
this.rangeMap = rangeMap;
@@ -81,7 +86,7 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration<
}
if (currentBindingSetIteratorIsValid()) {
final BindingSet currentBindingSet = currentBindingSetIterator.next();
- return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentStatement, currentBindingSet);
+ return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentResultStatement, currentBindingSet);
}
return null;
}
@@ -91,47 +96,81 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration<
}
private void findNextResult() {
- if (!currentResultCursorIsValid()) {
- findNextValidResultCursor();
+ if (!currentBatchQueryResultCursorIsValid()) {
+ submitBatchQuery();
}
- if (currentResultCursorIsValid()) {
+
+ if (currentBatchQueryResultCursorIsValid()) {
// convert to Rya Statement
- final Document queryResult = resultsIterator.next();
+ final Document queryResult = batchQueryResultsIterator.next();
final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
- currentStatement = strategy.deserializeDBObject(dbo);
- currentBindingSetIterator = currentBindingSetCollection.iterator();
+ currentResultStatement = strategy.deserializeDBObject(dbo);
+
+ // Find all of the queries in the executed RangeMap that this result matches
+ // and collect all of those binding sets
+ Set<BindingSet> bsList = new HashSet<>();
+ for (RyaStatement executedQuery : executedRangeMap.keys()) {
+ if (isResultForQuery(executedQuery, currentResultStatement)) {
+ bsList.addAll(executedRangeMap.get(executedQuery));
+ }
+ }
+ currentBindingSetIterator = bsList.iterator();
+ }
+
+ // Handle case of invalid currentResultStatement or no binding sets returned
+ if ((currentBindingSetIterator == null || !currentBindingSetIterator.hasNext()) && (currentBatchQueryResultCursorIsValid() || queryIterator.hasNext())) {
+ findNextResult();
}
}
+
+ private static boolean isResultForQuery(RyaStatement query, RyaStatement result) {
+ return isResult(query.getSubject(), result.getSubject()) &&
+ isResult(query.getPredicate(), result.getPredicate()) &&
+ isResult(query.getObject(), result.getObject()) &&
+ isResult(query.getContext(), result.getContext());
+ }
+
+ private static boolean isResult(RyaType query, RyaType result) {
+ return (query == null) || query.equals(result);
+ }
- private void findNextValidResultCursor() {
- while (queryIterator.hasNext()){
- final DBObject currentQuery = queryIterator.next();
- currentBindingSetCollection = rangeMap.get(currentQuery);
- // Executing redact aggregation to only return documents the user
- // has access to.
- final List<Document> pipeline = new ArrayList<>();
- pipeline.add(new Document("$match", currentQuery));
- pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
- log.debug(pipeline);
-
- final AggregateIterable<Document> aggIter = coll.aggregate(pipeline);
- aggIter.batchSize(1000);
- resultsIterator = aggIter.iterator();
- if (resultsIterator.hasNext()) {
- break;
- }
+ private void submitBatchQuery() {
+ int count = 0;
+ executedRangeMap.clear();
+ final List<Document> pipeline = new ArrayList<>();
+ final List<DBObject> match = new ArrayList<>();
+
+ while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
+ count++;
+ RyaStatement query = queryIterator.next();
+ executedRangeMap.putAll(query, rangeMap.get(query));
+ final DBObject currentQuery = strategy.getQuery(query);
+ match.add(currentQuery);
}
- }
- private boolean currentResultCursorIsValid() {
- return (resultsIterator != null) && resultsIterator.hasNext();
+ if (match.size() > 1) {
+ pipeline.add(new Document("$match", new Document("$or", match)));
+ } else if (match.size() == 1) {
+ pipeline.add(new Document("$match", match.get(0)));
+ } else {
+ batchQueryResultsIterator = Iterators.emptyIterator();
+ return;
+ }
+
+ // Executing redact aggregation to only return documents the user has access to.
+ pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
+ log.info(pipeline);
+
+ final AggregateIterable<Document> aggIter = coll.aggregate(pipeline);
+ aggIter.batchSize(1000);
+ batchQueryResultsIterator = aggIter.iterator();
}
-
- public void setMaxResults(final Long maxResults) {
- this.maxResults = maxResults;
+ private boolean currentBatchQueryResultCursorIsValid() {
+ return (batchQueryResultsIterator != null) && batchQueryResultsIterator.hasNext();
}
+
@Override
public void close() throws RyaDAOException {
// TODO don't know what to do here
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
deleted file mode 100644
index f9d84b2..0000000
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.rya.api.RdfCloudTripleStoreUtils;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.persist.RyaDAOException;
-
-import org.calrissian.mango.collect.CloseableIterable;
-import org.calrissian.mango.collect.CloseableIterator;
-import org.openrdf.query.BindingSet;
-
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-
-public class RyaStatementCursorIterable implements CloseableIterable<RyaStatement> {
-
-
- private NonCloseableRyaStatementCursorIterator iterator;
-
- public RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<RyaStatement> iterator() {
- // TODO Auto-generated method stub
- return iterator;
- }
-
- @Override
- public void closeQuietly() {
- //TODO don't know what to do here
- }
-
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
index 1a5eb99..82eed6f 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
@@ -18,103 +18,55 @@
*/
package org.apache.rya.mongodb.iter;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.Map.Entry;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
-import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
-import org.bson.Document;
+import org.openrdf.query.BindingSet;
-import com.mongodb.DBObject;
-import com.mongodb.client.AggregateIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.util.JSON;
+import com.google.common.base.Throwables;
import info.aduna.iteration.CloseableIteration;
-public class RyaStatementCursorIterator implements CloseableIteration<RyaStatement, RyaDAOException> {
- private static final Logger log = Logger.getLogger(RyaStatementCursorIterator.class);
+public class RyaStatementCursorIterator implements Iterator<RyaStatement>, CloseableIteration<RyaStatement, RyaDAOException> {
+ private final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iterator;
- private final MongoCollection coll;
- private final Iterator<DBObject> queryIterator;
- private Iterator<Document> resultsIterator;
- private final MongoDBStorageStrategy<RyaStatement> strategy;
- private Long maxResults;
- private final Authorizations auths;
-
- public RyaStatementCursorIterator(final MongoCollection<Document> collection, final Set<DBObject> queries,
- final MongoDBStorageStrategy<RyaStatement> strategy, final Authorizations auths) {
- coll = collection;
- queryIterator = queries.iterator();
- this.strategy = strategy;
- this.auths = auths;
+ public RyaStatementCursorIterator(CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iterator) {
+ this.iterator = iterator;
}
@Override
public boolean hasNext() {
- if (!currentCursorIsValid()) {
- findNextValidCursor();
+ try {
+ return iterator.hasNext();
+ } catch (RyaDAOException e) {
+ Throwables.propagate(e);
}
- return currentCursorIsValid();
+ return false;
}
@Override
public RyaStatement next() {
- if (!currentCursorIsValid()) {
- findNextValidCursor();
- }
- if (currentCursorIsValid()) {
- // convert to Rya Statement
- final Document queryResult = resultsIterator.next();
- final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
- final RyaStatement statement = strategy.deserializeDBObject(dbo);
- return statement;
+ try {
+ return iterator.next().getKey();
+ } catch (RyaDAOException e) {
+ Throwables.propagate(e);
}
return null;
}
- private void findNextValidCursor() {
- while (queryIterator.hasNext()){
- final DBObject currentQuery = queryIterator.next();
-
- // Executing redact aggregation to only return documents the user
- // has access to.
- final List<Document> pipeline = new ArrayList<>();
- pipeline.add(new Document("$match", currentQuery));
- pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
- log.debug(pipeline);
- final AggregateIterable<Document> output = coll.aggregate(pipeline);
- output.batchSize(1000);
-
- resultsIterator = output.iterator();
- if (resultsIterator.hasNext()) {
- break;
- }
- }
- }
-
- private boolean currentCursorIsValid() {
- return (resultsIterator != null) && resultsIterator.hasNext();
- }
-
-
- public void setMaxResults(final Long maxResults) {
- this.maxResults = maxResults;
- }
-
@Override
public void close() throws RyaDAOException {
- // TODO don't know what to do here
+ iterator.close();
}
@Override
- public void remove() throws RyaDAOException {
- next();
+ public void remove() {
+ try {
+ iterator.remove();
+ } catch (RyaDAOException e) {
+ Throwables.propagate(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
index d843d22..4187c85 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
@@ -98,6 +98,36 @@ public class MongoDBQueryEngineTest extends MongoRyaTestBase {
@SuppressWarnings("unchecked")
@Test
+ public void batchbindingSetsQuery() throws Exception {
+ final RyaStatement s1 = getStatement(null, null, "u:b");
+
+ final MapBindingSet bs1 = new MapBindingSet();
+ bs1.addBinding("foo", new URIImpl("u:x"));
+
+ final Map.Entry<RyaStatement, BindingSet> e1 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs1);
+ final Collection<Entry<RyaStatement, BindingSet>> stmts1 = Lists.newArrayList(e1);
+ Assert.assertEquals(1, size(engine.queryWithBindingSet(stmts1, configuration)));
+
+
+ final MapBindingSet bs2 = new MapBindingSet();
+ bs2.addBinding("foo", new URIImpl("u:y"));
+
+ final RyaStatement s2 = getStatement(null, null, "u:c");
+
+ final Map.Entry<RyaStatement, BindingSet> e2 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs2);
+
+ final Collection<Entry<RyaStatement, BindingSet>> stmts2 = Lists.newArrayList(e1, e2);
+ Assert.assertEquals(2, size(engine.queryWithBindingSet(stmts2, configuration)));
+
+
+ final Map.Entry<RyaStatement, BindingSet> e3 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs1);
+ final Map.Entry<RyaStatement, BindingSet> e4 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs2);
+
+ final Collection<Entry<RyaStatement, BindingSet>> stmts3 = Lists.newArrayList(e1, e2, e3, e4);
+ Assert.assertEquals(4, size(engine.queryWithBindingSet(stmts3, configuration)));
+}
+ @SuppressWarnings("unchecked")
+ @Test
public void bindingSetsQuery() throws Exception {
final RyaStatement s = getStatement("u:a", null, null);