You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/11 14:52:02 UTC
[3/6] incubator-rya git commit: RYA-416 A new query node type to
represent a MongoDB aggregation pipeline whose results can be converted to
binding sets,
and tools for optionally transforming some SPARQL expressions into such a
node. Closes #254.
RYA-416 A new query node type to represent a MongoDB aggregation pipeline whose results can be converted to binding sets, and tools for optionally transforming some SPARQL expressions into such a node. Closes #254.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d5ebb731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d5ebb731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d5ebb731
Branch: refs/heads/master
Commit: d5ebb7315509edbc212c7481f8abe76ee0eec934
Parents: fff50c4
Author: Jesse Hatfield <je...@parsons.com>
Authored: Thu Dec 21 17:32:47 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Thu Jan 11 09:50:03 2018 -0500
----------------------------------------------------------------------
dao/mongodb.rya/pom.xml | 5 +
.../AbstractMongoDBRdfConfigurationBuilder.java | 16 +
.../rya/mongodb/MongoDBRdfConfiguration.java | 40 +-
.../AggregationPipelineQueryNode.java | 856 +++++++++++++++++++
.../AggregationPipelineQueryOptimizer.java | 73 ++
.../aggregation/PipelineResultIteration.java | 135 +++
.../SparqlToPipelineTransformVisitor.java | 196 +++++
.../dao/SimpleMongoDBStorageStrategy.java | 21 +-
.../AggregationPipelineQueryNodeTest.java | 331 +++++++
.../mongodb/aggregation/PipelineQueryIT.java | 421 +++++++++
.../PipelineResultIterationTest.java | 152 ++++
.../SparqlToPipelineTransformVisitorTest.java | 207 +++++
12 files changed, 2446 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml
index 0803aa8..0afac81 100644
--- a/dao/mongodb.rya/pom.xml
+++ b/dao/mongodb.rya/pom.xml
@@ -86,5 +86,10 @@ Tests will fail with the following error when using 32bit JVM on either Linux or
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
index bb14a39..369f7a0 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
@@ -43,6 +43,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
protected static final String DEFAULT_MONGO_PORT = "27017";
private String mongoCollectionPrefix = "rya_";
private String mongoDBName = "rya";
+ private boolean usePipeline = false;
protected static final String MONGO_USER = "mongo.user";
protected static final String MONGO_PASSWORD = "mongo.password";
@@ -142,6 +143,20 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
}
/**
+ * Enable or disable an optimization that executes queries, to the extent
+ * possible, using the MongoDB aggregation pipeline. Defaults to false.
+ * If true, replaces a query tree or subtree with a single node representing
+ * a series of pipeline steps. Transformation may not be supported for all
+ * query algebra expressions; these expressions are left unchanged and the
+ * optimization is attempted on their child subtrees.
+ * @param usePipeline whether to use aggregation pipeline optimization.
+ */
+ public B setUseAggregationPipeline(boolean usePipeline) {
+ this.usePipeline = usePipeline;
+ return confBuilder();
+ }
+
+ /**
* @return extension of {@link MongoDBRdfConfiguration} with specified parameters set
*/
@Override
@@ -171,6 +186,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
conf.setTablePrefix(mongoCollectionPrefix);
conf.setMongoHostname(host);
conf.setMongoPort(port);
+ conf.setUseAggregationPipeline(usePipeline);
return conf;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index 835ed27..44dc851 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -20,11 +20,14 @@ package org.apache.rya.mongodb;
import static java.util.Objects.requireNonNull;
+import java.util.List;
import java.util.Properties;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.mongodb.aggregation.AggregationPipelineQueryOptimizer;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
import edu.umd.cs.findbugs.annotations.Nullable;
@@ -51,6 +54,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist";
+ public static final String USE_AGGREGATION_PIPELINE = "rya.mongodb.query.pipeline";
+
/**
* Constructs an empty instance of {@link MongoDBRdfConfiguration}.
*/
@@ -251,4 +256,37 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public void setFlush(final boolean flush){
setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
}
-}
\ No newline at end of file
+
+ /**
+ * Whether aggregation pipeline optimization is enabled.
+ * @return true if queries will be evaluated using MongoDB aggregation.
+ */
+ public boolean getUseAggregationPipeline() {
+ return getBoolean(USE_AGGREGATION_PIPELINE, false);
+ }
+
+ /**
+ * Enable or disable an optimization that executes queries, to the extent
+ * possible, using the MongoDB aggregation pipeline. Replaces a query tree
+ * or subtree with a single node representing a series of pipeline steps.
+ * Transformation may not be supported for all query algebra expressions;
+ * these expressions are left unchanged and the optimization is attempted
+ * on their child subtrees.
+ * @param value whether to use aggregation pipeline optimization.
+ */
+ public void setUseAggregationPipeline(boolean value) {
+ setBoolean(USE_AGGREGATION_PIPELINE, value);
+ }
+
+ @Override
+ public List<Class<QueryOptimizer>> getOptimizers() {
+ List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+ if (getUseAggregationPipeline()) {
+ Class<?> cl = AggregationPipelineQueryOptimizer.class;
+ @SuppressWarnings("unchecked")
+ Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
+ optimizers.add(optCl);
+ }
+ return optimizers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
new file mode 100644
index 0000000..7a84f5d
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.mongodb.MongoDbRdfConstants;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
+import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Projections;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Represents a portion of a query tree as MongoDB aggregation pipeline. Should
+ * be built bottom-up: start with a statement pattern implemented as a $match
+ * step, then add steps to the pipeline to handle higher levels of the query
+ * tree. Methods are provided to add certain supported query operations to the
+ * end of the internal pipeline. In some cases, specific arguments may be
+ * unsupported, in which case the pipeline is unchanged and the method returns
+ * false.
+ */
+public class AggregationPipelineQueryNode extends ExternalSet {
+ /**
+ * An aggregation result corresponding to a solution should map this key
+ * to an object which itself maps variable names to variable values.
+ */
+ static final String VALUES = "<VALUES>";
+
+ /**
+ * An aggregation result corresponding to a solution should map this key
+ * to an object which itself maps variable names to the corresponding hashes
+ * of their values.
+ */
+ static final String HASHES = "<HASHES>";
+
+ /**
+ * An aggregation result corresponding to a solution should map this key
+ * to an object which itself maps variable names to their datatypes, if any.
+ */
+ static final String TYPES = "<TYPES>";
+
+ private static final String LEVEL = "derivation_level";
+ private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP };
+
+ private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>";
+ private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>";
+
+ private static final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy();
+
+ private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue());
+ private static final Bson DEFAULT_CONTEXT = new Document("$literal", "");
+ private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV);
+ private static final Bson DEFAULT_METADATA = new Document("$literal",
+ StatementMetadata.EMPTY_METADATA.toString());
+
+ private static boolean isValidFieldName(String name) {
+ return !(name == null || name.contains(".") || name.contains("$")
+ || name.equals("_id"));
+ }
+
+ /**
+ * For a given statement pattern, represents a mapping from query variables
+ * to their corresponding parts of matching triples. If necessary, also
+ * substitute variable names including invalid characters with temporary
+ * replacements, while producing a map back to the original names.
+ */
+ private static class StatementVarMapping {
+ private final Map<String, String> varToTripleValue = new HashMap<>();
+ private final Map<String, String> varToTripleHash = new HashMap<>();
+ private final Map<String, String> varToTripleType = new HashMap<>();
+ private final BiMap<String, String> varToOriginalName;
+
+ String valueField(String varName) {
+ return varToTripleValue.get(varName);
+ }
+ String hashField(String varName) {
+ return varToTripleHash.get(varName);
+ }
+ String typeField(String varName) {
+ return varToTripleType.get(varName);
+ }
+
+ Set<String> varNames() {
+ return varToTripleValue.keySet();
+ }
+
+ private String replace(String original) {
+ if (varToOriginalName.containsValue(original)) {
+ return varToOriginalName.inverse().get(original);
+ }
+ else {
+ String replacement = "field-" + UUID.randomUUID();
+ varToOriginalName.put(replacement, original);
+ return replacement;
+ }
+ }
+
+ private String sanitize(String name) {
+ if (varToOriginalName.containsValue(name)) {
+ return varToOriginalName.inverse().get(name);
+ }
+ else if (name != null && !isValidFieldName(name)) {
+ return replace(name);
+ }
+ return name;
+ }
+
+ StatementVarMapping(StatementPattern sp, BiMap<String, String> varToOriginalName) {
+ this.varToOriginalName = varToOriginalName;
+ if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) {
+ String name = sanitize(sp.getSubjectVar().getName());
+ varToTripleValue.put(name, SUBJECT);
+ varToTripleHash.put(name, SUBJECT_HASH);
+ }
+ if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) {
+ String name = sanitize(sp.getPredicateVar().getName());
+ varToTripleValue.put(name, PREDICATE);
+ varToTripleHash.put(name, PREDICATE_HASH);
+ }
+ if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) {
+ String name = sanitize(sp.getObjectVar().getName());
+ varToTripleValue.put(name, OBJECT);
+ varToTripleHash.put(name, OBJECT_HASH);
+ varToTripleType.put(name, OBJECT_TYPE);
+ }
+ if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) {
+ String name = sanitize(sp.getContextVar().getName());
+ varToTripleValue.put(name, CONTEXT);
+ }
+ }
+
+ Bson getProjectExpression() {
+ return getProjectExpression(new LinkedList<>(), str -> "$" + str);
+ }
+
+ Bson getProjectExpression(Iterable<String> alsoInclude,
+ Function<String, String> getFieldExpr) {
+ Document values = new Document();
+ Document hashes = new Document();
+ Document types = new Document();
+ for (String varName : varNames()) {
+ values.append(varName, getFieldExpr.apply(valueField(varName)));
+ if (varToTripleHash.containsKey(varName)) {
+ hashes.append(varName, getFieldExpr.apply(hashField(varName)));
+ }
+ if (varToTripleType.containsKey(varName)) {
+ types.append(varName, getFieldExpr.apply(typeField(varName)));
+ }
+ }
+ for (String varName : alsoInclude) {
+ values.append(varName, 1);
+ hashes.append(varName, 1);
+ types.append(varName, 1);
+ }
+ List<Bson> fields = new LinkedList<>();
+ fields.add(Projections.excludeId());
+ fields.add(Projections.computed(VALUES, values));
+ fields.add(Projections.computed(HASHES, hashes));
+ if (!types.isEmpty()) {
+ fields.add(Projections.computed(TYPES, types));
+ }
+ fields.add(Projections.computed(LEVEL, new Document("$max",
+ Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 0))));
+ fields.add(Projections.computed(TIMESTAMP, new Document("$max",
+ Arrays.asList("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0))));
+ return Projections.fields(fields);
+ }
+ }
+
+ /**
+ * Given a StatementPattern, generate an object representing the arguments
+ * to a "$match" command that will find matching triples.
+ * @param sp The StatementPattern to search for
+ * @param path If given, specify the field that should be matched against
+ * the statement pattern, using an ordered list of field names for a nested
+ * field. E.g. to match records { "x": { "y": <statement pattern } }, pass
+ * "x" followed by "y".
+ * @return The argument of a "$match" query
+ */
+ private static BasicDBObject getMatchExpression(StatementPattern sp, String ... path) {
+ final Var subjVar = sp.getSubjectVar();
+ final Var predVar = sp.getPredicateVar();
+ final Var objVar = sp.getObjectVar();
+ final Var contextVar = sp.getContextVar();
+ RyaURI s = null;
+ RyaURI p = null;
+ RyaType o = null;
+ RyaURI c = null;
+ if (subjVar != null && subjVar.getValue() instanceof Resource) {
+ s = RdfToRyaConversions.convertResource((Resource) subjVar.getValue());
+ }
+ if (predVar != null && predVar.getValue() instanceof URI) {
+ p = RdfToRyaConversions.convertURI((URI) predVar.getValue());
+ }
+ if (objVar != null && objVar.getValue() != null) {
+ o = RdfToRyaConversions.convertValue(objVar.getValue());
+ }
+ if (contextVar != null && contextVar.getValue() instanceof URI) {
+ c = RdfToRyaConversions.convertURI((URI) contextVar.getValue());
+ }
+ RyaStatement rs = new RyaStatement(s, p, o, c);
+ DBObject obj = strategy.getQuery(rs);
+ // Add path prefix, if given
+ if (path.length > 0) {
+ StringBuilder sb = new StringBuilder();
+ for (String str : path) {
+ sb.append(str).append(".");
+ }
+ String prefix = sb.toString();
+ Set<String> originalKeys = new HashSet<>(obj.keySet());
+ originalKeys.forEach(key -> {
+ Object value = obj.removeField(key);
+ obj.put(prefix + key, value);
+ });
+ }
+ return (BasicDBObject) obj;
+ }
+
+ private static String valueFieldExpr(String varName) {
+ return "$" + VALUES + "." + varName;
+ }
+ private static String hashFieldExpr(String varName) {
+ return "$" + HASHES + "." + varName;
+ }
+ private static String typeFieldExpr(String varName) {
+ return "$" + TYPES + "." + varName;
+ }
+ private static String joinFieldExpr(String triplePart) {
+ return "$" + JOINED_TRIPLE + "." + triplePart;
+ }
+
+ /**
+ * Get an object representing the value field of some value expression, or
+ * return null if the expression isn't supported.
+ */
+ private Object valueFieldExpr(ValueExpr expr) {
+ if (expr instanceof Var) {
+ return valueFieldExpr(((Var) expr).getName());
+ }
+ else if (expr instanceof ValueConstant) {
+ return new Document("$literal", ((ValueConstant) expr).getValue().stringValue());
+ }
+ else {
+ return null;
+ }
+ }
+
+ private final List<Bson> pipeline;
+ private final MongoCollection<Document> collection;
+ private final Set<String> assuredBindingNames;
+ private final Set<String> bindingNames;
+ private final BiMap<String, String> varToOriginalName;
+
+ private String replace(String original) {
+ if (varToOriginalName.containsValue(original)) {
+ return varToOriginalName.inverse().get(original);
+ }
+ else {
+ String replacement = "field-" + UUID.randomUUID();
+ varToOriginalName.put(replacement, original);
+ return replacement;
+ }
+ }
+
+ /**
+ * Create a pipeline query node based on a StatementPattern.
+ * @param collection The collection of triples to query.
+ * @param baseSP The leaf node in the query tree.
+ */
+ public AggregationPipelineQueryNode(MongoCollection<Document> collection, StatementPattern baseSP) {
+ this.collection = Preconditions.checkNotNull(collection);
+ Preconditions.checkNotNull(baseSP);
+ this.varToOriginalName = HashBiMap.create();
+ StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName);
+ this.assuredBindingNames = new HashSet<>(mapping.varNames());
+ this.bindingNames = new HashSet<>(mapping.varNames());
+ this.pipeline = new LinkedList<>();
+ this.pipeline.add(Aggregates.match(getMatchExpression(baseSP)));
+ this.pipeline.add(Aggregates.project(mapping.getProjectExpression()));
+ }
+
+ AggregationPipelineQueryNode(MongoCollection<Document> collection,
+ List<Bson> pipeline, Set<String> assuredBindingNames,
+ Set<String> bindingNames, BiMap<String, String> varToOriginalName) {
+ this.collection = Preconditions.checkNotNull(collection);
+ this.pipeline = Preconditions.checkNotNull(pipeline);
+ this.assuredBindingNames = Preconditions.checkNotNull(assuredBindingNames);
+ this.bindingNames = Preconditions.checkNotNull(bindingNames);
+ this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof AggregationPipelineQueryNode) {
+ AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o;
+ if (this.collection.equals(other.collection)
+ && this.assuredBindingNames.equals(other.assuredBindingNames)
+ && this.bindingNames.equals(other.bindingNames)
+ && this.varToOriginalName.equals(other.varToOriginalName)
+ && this.pipeline.size() == other.pipeline.size()) {
+ // Check pipeline steps for equality -- underlying types don't
+ // have well-behaved equals methods, so check for equivalent
+ // string representations.
+ for (int i = 0; i < this.pipeline.size(); i++) {
+ Bson doc1 = this.pipeline.get(i);
+ Bson doc2 = other.pipeline.get(i);
+ if (!doc1.toString().equals(doc2.toString())) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(collection, pipeline, assuredBindingNames,
+ bindingNames, varToOriginalName);
+ }
+
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
+ throws QueryEvaluationException {
+ return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings);
+ }
+
+ @Override
+ public Set<String> getAssuredBindingNames() {
+ Set<String> names = new HashSet<>();
+ for (String name : assuredBindingNames) {
+ names.add(varToOriginalName.getOrDefault(name, name));
+ }
+ return names;
+ }
+
+ @Override
+ public Set<String> getBindingNames() {
+ Set<String> names = new HashSet<>();
+ for (String name : bindingNames) {
+ names.add(varToOriginalName.getOrDefault(name, name));
+ }
+ return names;
+ }
+
+ @Override
+ public AggregationPipelineQueryNode clone() {
+ return new AggregationPipelineQueryNode(collection,
+ new LinkedList<>(pipeline),
+ new HashSet<>(assuredBindingNames),
+ new HashSet<>(bindingNames),
+ HashBiMap.create(varToOriginalName));
+ }
+
+ @Override
+ public String getSignature() {
+ super.getSignature();
+ Set<String> assured = getAssuredBindingNames();
+ Set<String> any = getBindingNames();
+ StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: ");
+ sb.append(String.join(", ", assured));
+ if (any.size() > assured.size()) {
+ Set<String> optionalBindingNames = any;
+ optionalBindingNames.removeAll(assured);
+ sb.append(" [")
+ .append(String.join(", ", optionalBindingNames))
+ .append("]");
+ }
+ sb.append(")\n");
+ for (Bson doc : pipeline) {
+ sb.append(doc.toString()).append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Get the internal list of aggregation pipeline steps. Note that documents
+ * resulting from this pipeline will be structured using an internal
+ * intermediate representation. For documents representing triples, see
+ * {@link #getTriplePipeline}, and for query solutions, see
+ * {@link #evaluate}.
+ * @return The current internal pipeline.
+ */
+ List<Bson> getPipeline() {
+ return pipeline;
+ }
+
+ /**
+ * Add a join with an individual {@link StatementPattern} to the pipeline.
+ * @param sp The statement pattern to join with
+ * @return true if the join was successfully added to the pipeline.
+ */
+ public boolean joinWith(StatementPattern sp) {
+ Preconditions.checkNotNull(sp);
+ // 1. Determine shared variables and new variables
+ StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName);
+ NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames());
+ sharedVars.retainAll(assuredBindingNames);
+ // 2. Join on one shared variable
+ String joinKey = sharedVars.pollFirst();
+ String collectionName = collection.getNamespace().getCollectionName();
+ Bson join;
+ if (joinKey == null) {
+ return false;
+ }
+ else {
+ join = Aggregates.lookup(collectionName,
+ HASHES + "." + joinKey,
+ spMap.hashField(joinKey),
+ JOINED_TRIPLE);
+ }
+ pipeline.add(join);
+ // 3. Unwind the joined triples so each document represents a binding
+ // set (solution) from the base branch and a triple that may match.
+ pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE));
+ // 4. (Optional) If there are any shared variables that weren't used as
+ // the join key, project all existing fields plus a new field that
+ // tests the equality of those shared variables.
+ BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE);
+ if (!sharedVars.isEmpty()) {
+ List<Bson> eqTests = new LinkedList<>();
+ for (String varName : sharedVars) {
+ String oldField = valueFieldExpr(varName);
+ String newField = joinFieldExpr(spMap.valueField(varName));
+ Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField));
+ eqTests.add(eqTest);
+ }
+ Bson eqProjectOpts = Projections.fields(
+ Projections.computed(FIELDS_MATCH, Filters.and(eqTests)),
+ Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP));
+ pipeline.add(Aggregates.project(eqProjectOpts));
+ matchOpts.put(FIELDS_MATCH, true);
+ }
+ // 5. Filter for solutions whose triples match the joined statement
+ // pattern, and, if applicable, whose additional shared variables
+ // match the current solution.
+ pipeline.add(Aggregates.match(matchOpts));
+ // 6. Project the results to include variables from the new SP (with
+ // appropriate renaming) and variables referenced only in the base
+ // pipeline (with previous names).
+ Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName)
+ .getProjectExpression(assuredBindingNames,
+ str -> joinFieldExpr(str));
+ assuredBindingNames.addAll(spMap.varNames());
+ bindingNames.addAll(spMap.varNames());
+ pipeline.add(Aggregates.project(finalProjectOpts));
+ return true;
+ }
+
+ /**
+ * Add a SPARQL projection or multi-projection operation to the pipeline.
+ * The number of documents produced by the pipeline after this operation
+ * will be the number of documents entering this stage (the number of
+ * intermediate results) multiplied by the number of
+ * {@link ProjectionElemList}s supplied here.
+ * @param projections One or more projections, i.e. mappings from the result
+ * at this stage of the query into a set of variables.
+ * @return true if the projection(s) were added to the pipeline.
+ */
+ public boolean project(Iterable<ProjectionElemList> projections) {
+ if (projections == null || !projections.iterator().hasNext()) {
+ return false;
+ }
+ List<Bson> projectOpts = new LinkedList<>();
+ Set<String> bindingNamesUnion = new HashSet<>();
+ Set<String> bindingNamesIntersection = null;
+ for (ProjectionElemList projection : projections) {
+ Document valueDoc = new Document();
+ Document hashDoc = new Document();
+ Document typeDoc = new Document();
+ Set<String> projectionBindingNames = new HashSet<>();
+ for (ProjectionElem elem : projection.getElements()) {
+ String to = elem.getTargetName();
+ // If the 'to' name is invalid, replace it internally
+ if (!isValidFieldName(to)) {
+ to = replace(to);
+ }
+ String from = elem.getSourceName();
+ // If the 'from' name is invalid, use the internal substitute
+ if (varToOriginalName.containsValue(from)) {
+ from = varToOriginalName.inverse().get(from);
+ }
+ projectionBindingNames.add(to);
+ if (to.equals(from)) {
+ valueDoc.append(to, 1);
+ hashDoc.append(to, 1);
+ typeDoc.append(to, 1);
+ }
+ else {
+ valueDoc.append(to, valueFieldExpr(from));
+ hashDoc.append(to, hashFieldExpr(from));
+ typeDoc.append(to, typeFieldExpr(from));
+ }
+ }
+ bindingNamesUnion.addAll(projectionBindingNames);
+ if (bindingNamesIntersection == null) {
+ bindingNamesIntersection = new HashSet<>(projectionBindingNames);
+ }
+ else {
+ bindingNamesIntersection.retainAll(projectionBindingNames);
+ }
+ projectOpts.add(new Document()
+ .append(VALUES, valueDoc)
+ .append(HASHES, hashDoc)
+ .append(TYPES, typeDoc)
+ .append(LEVEL, "$" + LEVEL)
+ .append(TIMESTAMP, "$" + TIMESTAMP));
+ }
+ if (projectOpts.size() == 1) {
+ pipeline.add(Aggregates.project(projectOpts.get(0)));
+ }
+ else {
+ String listKey = "PROJECTIONS";
+ Bson projectIndividual = Projections.fields(
+ Projections.computed(VALUES, "$" + listKey + "." + VALUES),
+ Projections.computed(HASHES, "$" + listKey + "." + HASHES),
+ Projections.computed(TYPES, "$" + listKey + "." + TYPES),
+ Projections.include(LEVEL),
+ Projections.include(TIMESTAMP));
+ pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts)));
+ pipeline.add(Aggregates.unwind("$" + listKey));
+ pipeline.add(Aggregates.project(projectIndividual));
+ }
+ assuredBindingNames.clear();
+ bindingNames.clear();
+ assuredBindingNames.addAll(bindingNamesIntersection);
+ bindingNames.addAll(bindingNamesUnion);
+ return true;
+ }
+
+ /**
+ * Add a SPARQL extension to the pipeline, if possible. An extension adds
+ * some number of variables to the result. Adds a "$project" step to the
+ * pipeline, but differs from the SPARQL project operation in that
+ * 1) pre-existing variables are always kept, and 2) values of new variables
+ * are defined by expressions, which may be more complex than simply
+ * variable names. Not all expressions are supported. If unsupported
+ * expression types are used in the extension, the pipeline will remain
+ * unchanged and this method will return false.
+ * @param extensionElements A list of new variables and their expressions
+ * @return True if the extension was successfully converted into a pipeline
+ * step, false otherwise.
+ */
+ public boolean extend(Iterable<ExtensionElem> extensionElements) {
+ List<Bson> valueFields = new LinkedList<>();
+ List<Bson> hashFields = new LinkedList<>();
+ List<Bson> typeFields = new LinkedList<>();
+ for (String varName : bindingNames) {
+ valueFields.add(Projections.include(varName));
+ hashFields.add(Projections.include(varName));
+ typeFields.add(Projections.include(varName));
+ }
+ Set<String> newVarNames = new HashSet<>();
+ for (ExtensionElem elem : extensionElements) {
+ String name = elem.getName();
+ if (!isValidFieldName(name)) {
+ // If the field name is invalid, replace it internally
+ name = replace(name);
+ }
+ // We can only handle certain kinds of value expressions; return
+ // failure for any others.
+ ValueExpr expr = elem.getExpr();
+ final Object valueField;
+ final Object hashField;
+ final Object typeField;
+ if (expr instanceof Var) {
+ String varName = ((Var) expr).getName();
+ valueField = "$" + varName;
+ hashField = "$" + varName;
+ typeField = "$" + varName;
+ }
+ else if (expr instanceof ValueConstant) {
+ Value val = ((ValueConstant) expr).getValue();
+ valueField = new Document("$literal", val.stringValue());
+ hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue()));
+ if (val instanceof Literal) {
+ typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue());
+ }
+ else {
+ typeField = null;
+ }
+ }
+ else {
+ // if not understood, return failure
+ return false;
+ }
+ valueFields.add(Projections.computed(name, valueField));
+ hashFields.add(Projections.computed(name, hashField));
+ if (typeField != null) {
+ typeFields.add(Projections.computed(name, typeField));
+ }
+ newVarNames.add(name);
+ }
+ assuredBindingNames.addAll(newVarNames);
+ bindingNames.addAll(newVarNames);
+ Bson projectOpts = Projections.fields(
+ Projections.computed(VALUES, Projections.fields(valueFields)),
+ Projections.computed(HASHES, Projections.fields(hashFields)),
+ Projections.computed(TYPES, Projections.fields(typeFields)),
+ Projections.include(LEVEL),
+ Projections.include(TIMESTAMP));
+ pipeline.add(Aggregates.project(projectOpts));
+ return true;
+ }
+
+ /**
+ * Add a SPARQL filter to the pipeline, if possible. A filter eliminates
+ * results that don't satisfy a given condition. Not all conditional
+ * expressions are supported. If unsupported expressions are used in the
+ * filter, the pipeline will remain unchanged and this method will return
+ * false. Currently only supports binary {@link Compare} conditions among
+ * variables and/or literals.
+ * @param condition The filter condition
+ * @return True if the filter was successfully converted into a pipeline
+ * step, false otherwise.
+ */
+ public boolean filter(ValueExpr condition) {
+ if (condition instanceof Compare) {
+ Compare compare = (Compare) condition;
+ Compare.CompareOp operator = compare.getOperator();
+ Object leftArg = valueFieldExpr(compare.getLeftArg());
+ Object rightArg = valueFieldExpr(compare.getRightArg());
+ if (leftArg == null || rightArg == null) {
+ // unsupported value expression, can't convert filter
+ return false;
+ }
+ final String opFunc;
+ switch (operator) {
+ case EQ:
+ opFunc = "$eq";
+ break;
+ case NE:
+ opFunc = "$ne";
+ break;
+ case LT:
+ opFunc = "$lt";
+ break;
+ case LE:
+ opFunc = "$le";
+ break;
+ case GT:
+ opFunc = "$gt";
+ break;
+ case GE:
+ opFunc = "$ge";
+ break;
+ default:
+ // unrecognized comparison operator, can't convert filter
+ return false;
+ }
+ Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg));
+ pipeline.add(Aggregates.project(Projections.fields(
+ Projections.computed("FILTER", compareDoc),
+ Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
+ pipeline.add(Aggregates.match(new Document("FILTER", true)));
+ pipeline.add(Aggregates.project(Projections.fields(
+ Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Add a $group step to filter out redundant solutions.
+ * @return True if the distinct operation was successfully appended.
+ */
+ public boolean distinct() {
+ List<String> key = new LinkedList<>();
+ for (String varName : bindingNames) {
+ key.add(hashFieldExpr(varName));
+ }
+ List<BsonField> reduceOps = new LinkedList<>();
+ for (String field : FIELDS) {
+ reduceOps.add(new BsonField(field, new Document("$first", "$" + field)));
+ }
+ pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps));
+ return true;
+ }
+
+ /**
+ * Add a step to the end of the current pipeline which prunes the results
+ * according to the recorded derivation level of their sources. At least one
+ * triple that was used to construct the result must have a derivation level
+ * at least as high as the parameter, indicating that it was derived via
+ * that many steps from the original data. (A value of zero is equivalent to
+ * input data that was not derived at all.) Use in conjunction with
+ * getTriplePipeline (which sets source level for generated triples) to
+ * avoid repeatedly deriving the same results.
+ * @param requiredLevel Required derivation depth. Reject a solution to the
+ * query if all of the triples involved in producing that solution have a
+ * lower derivation depth than this. If zero, does nothing.
+ */
+ public void requireSourceDerivationDepth(int requiredLevel) {
+ if (requiredLevel > 0) {
+ pipeline.add(Aggregates.match(new Document(LEVEL,
+ new Document("$gte", requiredLevel))));
+ }
+ }
+
+ /**
+ * Add a step to the end of the current pipeline which prunes the results
+ * according to the timestamps of their sources. At least one triple that
+ * was used to construct the result must have a timestamp at least as
+ * recent as the parameter. Use in iterative applications to avoid deriving
+ * solutions that would have been generated in an earlier iteration.
+ * @param t Minimum required timestamp. Reject a solution to the query if
+ * all of the triples involved in producing that solution have an earlier
+ * timestamp than this.
+ */
+ public void requireSourceTimestamp(long t) {
+ pipeline.add(Aggregates.match(new Document(TIMESTAMP,
+ new Document("$gte", t))));
+ }
+
+ /**
+ * Given that the current state of the pipeline produces data that can be
+ * interpreted as triples, add a project step to map each result from the
+ * intermediate result structure to a structure that can be stored in the
+ * triple store. Does not modify the internal pipeline, which will still
+ * produce intermediate results suitable for query evaluation.
+ * @param timestamp Attach this timestamp to the resulting triples.
+ * @param requireNew If true, add an additional step to check constructed
+ * triples against existing triples and only include new ones in the
+ * result. Adds a potentially expensive $lookup step.
+ * @throws IllegalStateException if the results produced by the current
+ * pipeline do not have variable names allowing them to be interpreted as
+ * triples (i.e. "subject", "predicate", and "object").
+ */
+ public List<Bson> getTriplePipeline(long timestamp, boolean requireNew) {
+ if (!assuredBindingNames.contains(SUBJECT)
+ || !assuredBindingNames.contains(PREDICATE)
+ || !assuredBindingNames.contains(OBJECT)) {
+ throw new IllegalStateException("Current pipeline does not produce "
+ + "records that can be converted into triples.\n"
+ + "Required variable names: <" + SUBJECT + ", " + PREDICATE
+ + ", " + OBJECT + ">\nCurrent variable names: "
+ + assuredBindingNames);
+ }
+ List<Bson> triplePipeline = new LinkedList<>(pipeline);
+ List<Bson> fields = new LinkedList<>();
+ fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT)));
+ fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT)));
+ fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE)));
+ fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE)));
+ fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT)));
+ fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT)));
+ fields.add(Projections.computed(OBJECT_TYPE,
+ ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE)));
+ fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT));
+ fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA));
+ fields.add(DEFAULT_DV);
+ fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp)));
+ fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1))));
+ triplePipeline.add(Aggregates.project(Projections.fields(fields)));
+ if (requireNew) {
+ // Prune any triples that already exist in the data store
+ String collectionName = collection.getNamespace().getCollectionName();
+ Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH,
+ PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH,
+ OBJECT_TYPE, CONTEXT, STATEMENT_METADATA,
+ DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL);
+ List<Bson> eqTests = new LinkedList<>();
+ eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH)));
+ eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH)));
+ Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE)
+ .append("as", "this").append("cond", new Document("$and", eqTests)));
+ triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH,
+ SUBJECT_HASH, JOINED_TRIPLE));
+ String numRedundant = "REDUNDANT";
+ triplePipeline.add(Aggregates.project(Projections.fields(includeAll,
+ Projections.computed(numRedundant, new Document("$size", redundantFilter)))));
+ triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0)));
+ triplePipeline.add(Aggregates.project(Projections.fields(includeAll)));
+ }
+ return triplePipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
new file mode 100644
index 0000000..fb1f558
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * MongoDB-specific query optimizer that replaces part or all of a SPARQL query
+ * tree with a MongoDB aggregation pipeline.
+ * <p>
+ * Transforms query trees using {@link SparqlToPipelineTransformVisitor}. If
+ * possible, this visitor will replace portions of the query tree, or the entire
+ * query, with an equivalent aggregation pipeline (contained in an
+ * {@link AggregationPipelineQueryNode}), thereby allowing query logic to be
+ * evaluated by the MongoDB server rather than by the client.
+ */
+public class AggregationPipelineQueryOptimizer implements QueryOptimizer, Configurable {
+ private StatefulMongoDBRdfConfiguration conf;
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
+ SparqlToPipelineTransformVisitor pipelineVisitor = new SparqlToPipelineTransformVisitor(conf);
+ try {
+ tupleExpr.visit(pipelineVisitor);
+ } catch (Exception e) {
+ logger.error("Error attempting to transform query using the aggregation pipeline", e);
+ }
+ }
+
+ /**
+ * @throws IllegalArgumentException if conf is not a {@link StatefulMongoDBRdfConfiguration}.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkArgument(conf instanceof StatefulMongoDBRdfConfiguration,
+ "Expected an instance of %s; received %s",
+ StatefulMongoDBRdfConfiguration.class.getName(), conf.getClass().getName());
+ this.conf = (StatefulMongoDBRdfConfiguration) conf;
+ }
+
+ @Override
+ public StatefulMongoDBRdfConfiguration getConf() {
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
new file mode 100644
index 0000000..c533efc
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Map;
+
+import org.bson.Document;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.MongoCursor;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * An iterator that converts the documents resulting from an
+ * {@link AggregationPipelineQueryNode} into {@link BindingSet}s.
+ */
+public class PipelineResultIteration implements CloseableIteration<BindingSet, QueryEvaluationException> {
+ private static final int BATCH_SIZE = 1000;
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ private final MongoCursor<Document> cursor;
+ private final Map<String, String> varToOriginalName;
+ private final BindingSet bindings;
+ private BindingSet nextSolution = null;
+
+ /**
+ * Constructor.
+ * @param aggIter Iterator of documents in AggregationPipelineQueryNode's
+ * intermediate solution representation.
+ * @param varToOriginalName A mapping from field names in the pipeline
+ * result documents to equivalent variable names in the original query.
+ * Where an entry does not exist for a field, the field name and variable
+ * name are assumed to be the same.
+ * @param bindings A partial solution. May be empty.
+ */
+ public PipelineResultIteration(AggregateIterable<Document> aggIter,
+ Map<String, String> varToOriginalName,
+ BindingSet bindings) {
+ this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName);
+ this.bindings = Preconditions.checkNotNull(bindings);
+ Preconditions.checkNotNull(aggIter);
+ aggIter.batchSize(BATCH_SIZE);
+ this.cursor = aggIter.iterator();
+ }
+
+ private void lookahead() {
+ while (nextSolution == null && cursor.hasNext()) {
+ nextSolution = docToBindingSet(cursor.next());
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws QueryEvaluationException {
+ lookahead();
+ return nextSolution != null;
+ }
+
+ @Override
+ public BindingSet next() throws QueryEvaluationException {
+ lookahead();
+ BindingSet solution = nextSolution;
+ nextSolution = null;
+ return solution;
+ }
+
+ /**
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public void remove() throws QueryEvaluationException {
+ throw new UnsupportedOperationException("remove() undefined for query result iteration");
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+ cursor.close();
+ }
+
+ private QueryBindingSet docToBindingSet(Document result) {
+ QueryBindingSet bindingSet = new QueryBindingSet(bindings);
+ Document valueSet = result.get(AggregationPipelineQueryNode.VALUES, Document.class);
+ Document typeSet = result.get(AggregationPipelineQueryNode.TYPES, Document.class);
+ if (valueSet != null) {
+ for (Map.Entry<String, Object> entry : valueSet.entrySet()) {
+ String fieldName = entry.getKey();
+ String valueString = entry.getValue().toString();
+ String typeString = typeSet == null ? null : typeSet.getString(fieldName);
+ String varName = varToOriginalName.getOrDefault(fieldName, fieldName);
+ Value varValue;
+ if (typeString == null || typeString.equals(XMLSchema.ANYURI.stringValue())) {
+ varValue = VF.createURI(valueString);
+ }
+ else {
+ varValue = VF.createLiteral(valueString, VF.createURI(typeString));
+ }
+ Binding existingBinding = bindingSet.getBinding(varName);
+ // If this variable is not already bound, add it.
+ if (existingBinding == null) {
+ bindingSet.addBinding(varName, varValue);
+ }
+ // If it's bound to something else, the solutions are incompatible.
+ else if (!existingBinding.getValue().equals(varValue)) {
+ return null;
+ }
+ }
+ }
+ return bindingSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
new file mode 100644
index 0000000..b7f5a67
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.bson.Document;
+import org.openrdf.query.algebra.Distinct;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Visitor that transforms a SPARQL query tree by replacing as much of the tree
+ * as possible with one or more {@code AggregationPipelineQueryNode}s.
+ * <p>
+ * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation
+ * pipeline which is equivalent to the replaced portion of the original query.
+ * Evaluating this node executes the pipeline and converts the results into
+ * query solutions. If only part of the query was transformed, the remaining
+ * query logic (higher up in the query tree) can be applied to those
+ * intermediate solutions as normal.
+ * <p>
+ * In general, processes the tree in bottom-up order: A leaf node
+ * ({@link StatementPattern}) is replaced with a pipeline that matches the
+ * corresponding statements. Then, if the parent node's semantics are supported
+ * by the visitor, stages are appended to the pipeline and the subtree at the
+ * parent node is replaced with the extended pipeline. This continues up the
+ * tree until reaching a node that cannot be transformed, in which case that
+ * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf
+ * node) instead of the previous subtree, or until the entire tree has been
+ * subsumed into a single pipeline node.
+ * <p>
+ * Nodes which are transformed into pipeline stages:
+ * <p><ul>
+ * <li>A {@code StatementPattern} node forms the beginning of each pipeline.
+ * <li>Single-argument operations {@link Projection}, {@link MultiProjection},
+ * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed
+ * into pipeline stages whenever the child {@link TupleExpr} represents a
+ * pipeline.
+ * <li>A {@link Filter} operation will be appended to the pipeline when its
+ * child {@code TupleExpr} represents a pipeline and the filter condition is a
+ * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}.
+ * <li>A {@link Join} operation will be appended to the pipeline when one child
+ * is a {@code StatementPattern} and the other is an
+ * {@code AggregationPipelineQueryNode}.
+ * </ul>
+ */
+public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase<Exception> {
+ private final MongoCollection<Document> inputCollection;
+
+ /**
+ * Instantiate a visitor directly from a {@link MongoCollection}.
+ * @param inputCollection Stores triples.
+ */
+ public SparqlToPipelineTransformVisitor(MongoCollection<Document> inputCollection) {
+ this.inputCollection = Preconditions.checkNotNull(inputCollection);
+ }
+
+ /**
+ * Instantiate a visitor from a {@link MongoDBRdfConfiguration}.
+ * @param conf Contains database connection information.
+ */
+ public SparqlToPipelineTransformVisitor(StatefulMongoDBRdfConfiguration conf) {
+ Preconditions.checkNotNull(conf);
+ MongoClient mongo = conf.getMongoClient();
+ MongoDatabase db = mongo.getDatabase(conf.getMongoDBName());
+ this.inputCollection = db.getCollection(conf.getTriplesCollectionName());
+ }
+
+ @Override
+ public void meet(StatementPattern sp) {
+ sp.replaceWith(new AggregationPipelineQueryNode(inputCollection, sp));
+ }
+
+ @Override
+ public void meet(Join join) throws Exception {
+ // If one branch is a single statement pattern, then try replacing the
+ // other with a pipeline.
+ AggregationPipelineQueryNode pipelineNode = null;
+ StatementPattern joinWithSP = null;
+ if (join.getRightArg() instanceof StatementPattern) {
+ join.getLeftArg().visit(this);
+ if (join.getLeftArg() instanceof AggregationPipelineQueryNode) {
+ pipelineNode = (AggregationPipelineQueryNode) join.getLeftArg();
+ joinWithSP = (StatementPattern) join.getRightArg();
+ }
+ }
+ else if (join.getLeftArg() instanceof StatementPattern) {
+ join.getRightArg().visit(this);
+ if (join.getRightArg() instanceof AggregationPipelineQueryNode) {
+ pipelineNode = (AggregationPipelineQueryNode) join.getRightArg();
+ joinWithSP = (StatementPattern) join.getLeftArg();
+ }
+ }
+ else {
+ // Otherwise, visit the children to try to replace smaller subtrees
+ join.visitChildren(this);
+ }
+ // If this is now a join between a pipeline node and a statement
+ // pattern, add the join step at the end of the pipeline, and replace
+ // this node with the extended pipeline node.
+ if (pipelineNode != null && joinWithSP != null && pipelineNode.joinWith(joinWithSP)) {
+ join.replaceWith(pipelineNode);
+ }
+ }
+
+ @Override
+ public void meet(Projection projectionNode) throws Exception {
+ projectionNode.visitChildren(this);
+ if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg();
+ if (pipelineNode.project(Arrays.asList(projectionNode.getProjectionElemList()))) {
+ projectionNode.replaceWith(pipelineNode);
+ }
+ }
+ }
+
+ @Override
+ public void meet(MultiProjection projectionNode) throws Exception {
+ projectionNode.visitChildren(this);
+ if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg();
+ if (pipelineNode.project(projectionNode.getProjections())) {
+ projectionNode.replaceWith(pipelineNode);
+ }
+ }
+ }
+
+ @Override
+ public void meet(Extension extensionNode) throws Exception {
+ extensionNode.visitChildren(this);
+ if (extensionNode.getArg() instanceof AggregationPipelineQueryNode && extensionNode.getParentNode() != null) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) extensionNode.getArg();
+ if (pipelineNode.extend(extensionNode.getElements())) {
+ extensionNode.replaceWith(pipelineNode);
+ }
+ }
+ }
+
+ @Override
+ public void meet(Reduced reducedNode) throws Exception {
+ reducedNode.visitChildren(this);
+ if (reducedNode.getArg() instanceof AggregationPipelineQueryNode && reducedNode.getParentNode() != null) {
+ reducedNode.replaceWith(reducedNode.getArg());
+ }
+ }
+
+ @Override
+ public void meet(Distinct distinctNode) throws Exception {
+ distinctNode.visitChildren(this);
+ if (distinctNode.getArg() instanceof AggregationPipelineQueryNode && distinctNode.getParentNode() != null) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) distinctNode.getArg();
+ pipelineNode.distinct();
+ distinctNode.replaceWith(pipelineNode);
+ }
+ }
+
+ @Override
+ public void meet(Filter filterNode) throws Exception {
+ filterNode.visitChildren(this);
+ if (filterNode.getArg() instanceof AggregationPipelineQueryNode && filterNode.getParentNode() != null) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) filterNode.getArg();
+ if (pipelineNode.filter(filterNode.getCondition())) {
+ filterNode.replaceWith(pipelineNode);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
index db33181..ecad9c6 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
@@ -63,6 +63,15 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
public static final String STATEMENT_METADATA = "statementMetadata";
public static final String DOCUMENT_VISIBILITY = "documentVisibility";
+ /**
+ * Generate the hash that will be used to index and retrieve a given value.
+ * @param value A value to be stored or accessed (e.g. a URI or literal).
+ * @return the hash associated with that value in MongoDB.
+ */
+ public static String hash(String value) {
+ return DigestUtils.sha256Hex(value);
+ }
+
protected ValueFactoryImpl factory = new ValueFactoryImpl();
@Override
@@ -91,14 +100,14 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
final RyaURI context = stmt.getContext();
final BasicDBObject query = new BasicDBObject();
if (subject != null){
- query.append(SUBJECT_HASH, DigestUtils.sha256Hex(subject.getData()));
+ query.append(SUBJECT_HASH, hash(subject.getData()));
}
if (object != null){
- query.append(OBJECT_HASH, DigestUtils.sha256Hex(object.getData()));
+ query.append(OBJECT_HASH, hash(object.getData()));
query.append(OBJECT_TYPE, object.getDataType().toString());
}
if (predicate != null){
- query.append(PREDICATE_HASH, DigestUtils.sha256Hex(predicate.getData()));
+ query.append(PREDICATE_HASH, hash(predicate.getData()));
}
if (context != null){
query.append(CONTEXT, context.getData());
@@ -179,11 +188,11 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
final BasicDBObject dvObject = DocumentVisibilityAdapter.toDBObject(statement.getColumnVisibility());
final BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes)))
.append(SUBJECT, statement.getSubject().getData())
- .append(SUBJECT_HASH, DigestUtils.sha256Hex(statement.getSubject().getData()))
+ .append(SUBJECT_HASH, hash(statement.getSubject().getData()))
.append(PREDICATE, statement.getPredicate().getData())
- .append(PREDICATE_HASH, DigestUtils.sha256Hex(statement.getPredicate().getData()))
+ .append(PREDICATE_HASH, hash(statement.getPredicate().getData()))
.append(OBJECT, statement.getObject().getData())
- .append(OBJECT_HASH, DigestUtils.sha256Hex(statement.getObject().getData()))
+ .append(OBJECT_HASH, hash(statement.getObject().getData()))
.append(OBJECT_TYPE, statement.getObject().getDataType().toString())
.append(CONTEXT, context)
.append(STATEMENT_METADATA, statement.getMetadata().toString())
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
new file mode 100644
index 0000000..1e056c4
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.IsLiteral;
+import org.openrdf.query.algebra.Not;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Sets;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoCollection;
+
+public class AggregationPipelineQueryNodeTest {
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ private static final String LUBM = "urn:lubm";
+ private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent");
+ private static final URI TAKES = VF.createURI(LUBM, "takesCourse");
+
+ private static Var constant(URI value) {
+ return new Var(value.stringValue(), value);
+ }
+
+ private MongoCollection<Document> collection;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ collection = Mockito.mock(MongoCollection.class);
+ Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection"));
+ }
+
+ @Test
+ public void testEquals() {
+ final AggregationPipelineQueryNode node1 = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ final AggregationPipelineQueryNode node2 = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ Assert.assertEquals(node1, node2);
+ Assert.assertEquals(node1.hashCode(), node2.hashCode());
+ final AggregationPipelineQueryNode diff1 = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y"),
+ HashBiMap.create());
+ final AggregationPipelineQueryNode diff2 = new AggregationPipelineQueryNode(
+ collection,
+ Arrays.asList(new Document()),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ HashBiMap<String, String> varMapping = HashBiMap.create();
+ varMapping.put("field-x", "x");
+ final AggregationPipelineQueryNode diff3 = new AggregationPipelineQueryNode(
+ collection,
+ Arrays.asList(new Document()),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ varMapping);
+ Assert.assertNotEquals(diff1, node1);
+ Assert.assertNotEquals(diff2, node1);
+ Assert.assertNotEquals(diff3, node1);
+ node1.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+ node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+ Assert.assertEquals(node1, node2);
+ node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+ Assert.assertNotEquals(node1, node2);
+ }
+
+ @Test
+ public void testClone() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ final AggregationPipelineQueryNode copy = base.clone();
+ Assert.assertEquals(base, copy);
+ copy.getPipeline().add(new Document("$project", new Document()));
+ Assert.assertNotEquals(base, copy);
+ base.getPipeline().add(new Document("$project", new Document()));
+ Assert.assertEquals(base, copy);
+ }
+
+ @Test
+ public void testStatementPattern() throws Exception {
+ // All variables
+ StatementPattern sp = new StatementPattern(new Var("s"), new Var("p"), new Var("o"));
+ AggregationPipelineQueryNode node = new AggregationPipelineQueryNode(collection, sp);
+ Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getAssuredBindingNames());
+ Assert.assertEquals(2, node.getPipeline().size());
+ // All constants
+ sp = new StatementPattern(constant(VF.createURI("urn:Alice")), constant(RDF.TYPE), constant(UNDERGRAD));
+ node = new AggregationPipelineQueryNode(collection, sp);
+ Assert.assertEquals(Sets.newHashSet(), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet(), node.getAssuredBindingNames());
+ Assert.assertEquals(2, node.getPipeline().size());
+ // Mixture
+ sp = new StatementPattern(new Var("student"), constant(RDF.TYPE), constant(UNDERGRAD));
+ node = new AggregationPipelineQueryNode(collection, sp);
+ Assert.assertEquals(Sets.newHashSet("student"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("student"), node.getAssuredBindingNames());
+ Assert.assertEquals(2, node.getPipeline().size());
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Join on one shared variable
+ AggregationPipelineQueryNode node = base.clone();
+ boolean success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+ Assert.assertTrue(success);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "c", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y", "c"), node.getAssuredBindingNames());
+ Assert.assertEquals(4, node.getPipeline().size());
+ // Join on multiple shared variables
+ node = base.clone();
+ success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("y")));
+ Assert.assertTrue(success);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(5, node.getPipeline().size());
+ }
+
+ @Test
+ public void testProject() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Add a single projection
+ ProjectionElemList singleProjection = new ProjectionElemList();
+ singleProjection.addElement(new ProjectionElem("x", "z"));
+ singleProjection.addElement(new ProjectionElem("y", "y"));
+ List<ProjectionElemList> projections = Arrays.asList(singleProjection);
+ AggregationPipelineQueryNode node = base.clone();
+ boolean success = node.project(projections);
+ Assert.assertTrue(success);
+ Assert.assertEquals(1, node.getPipeline().size());
+ Assert.assertEquals(Sets.newHashSet("z", "y"),
+ node.getAssuredBindingNames());
+ Assert.assertEquals(Sets.newHashSet("z", "y"),
+ node.getBindingNames());
+ // Add a multi-projection
+ ProjectionElemList p1 = new ProjectionElemList();
+ p1.addElement(new ProjectionElem("x", "solution"));
+ ProjectionElemList p2 = new ProjectionElemList();
+ p2.addElement(new ProjectionElem("y", "solution"));
+ ProjectionElemList p3 = new ProjectionElemList();
+ p3.addElement(new ProjectionElem("x", "x"));
+ p3.addElement(new ProjectionElem("x", "solution"));
+ p3.addElement(new ProjectionElem("y", "y"));
+ projections = Arrays.asList(p1, p2, p3);
+ node = base.clone();
+ success = node.project(projections);
+ Assert.assertTrue(success);
+ Assert.assertEquals(3, node.getPipeline().size());
+ Assert.assertEquals(Sets.newHashSet("solution"),
+ node.getAssuredBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y", "solution"),
+ node.getBindingNames());
+ // Add no projections
+ node = base.clone();
+ success = node.project(Arrays.asList());
+ Assert.assertFalse(success);
+ Assert.assertEquals(base, node);
+ }
+
+ @Test
+ public void testExtend() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Extend with a mix of variables and constants
+ List<ExtensionElem> extensionElements = Arrays.asList(
+ new ExtensionElem(new Var("x"), "subject"),
+ new ExtensionElem(new ValueConstant(RDF.TYPE), "predicate"),
+ new ExtensionElem(new Var("y"), "object"));
+ AggregationPipelineQueryNode node = base.clone();
+ boolean success = node.extend(extensionElements);
+ Assert.assertTrue(success);
+ Assert.assertEquals(1, node.getPipeline().size());
+ Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object"),
+ node.getAssuredBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object", "opt"),
+ node.getBindingNames());
+ // Attempt to extend with an unsupported expression
+ extensionElements = Arrays.asList(
+ new ExtensionElem(new Var("x"), "subject"),
+ new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue"));
+ node = base.clone();
+ success = node.extend(extensionElements);
+ Assert.assertFalse(success);
+ Assert.assertEquals(base, node);
+ }
+
+ @Test
+ public void testDistinct() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ AggregationPipelineQueryNode node = base.clone();
+ boolean success = node.distinct();
+ Assert.assertTrue(success);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(1, node.getPipeline().size());
+ }
+
+ @Test
+ public void testFilter() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Extend with a supported filter
+ AggregationPipelineQueryNode node = base.clone();
+ boolean success = node.filter(new Compare(new Var("x"), new Var("y"), Compare.CompareOp.EQ));
+ Assert.assertTrue(success);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(3, node.getPipeline().size());
+ // Extend with an unsupported filter
+ node = base.clone();
+ success = node.filter(new IsLiteral(new Var("opt")));
+ Assert.assertFalse(success);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(0, node.getPipeline().size());
+ }
+
+ @Test
+ public void testRequireSourceDerivationLevel() throws Exception {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Extend with a level greater than zero
+ AggregationPipelineQueryNode node = base.clone();
+ node.requireSourceDerivationDepth(3);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(1, node.getPipeline().size());
+ // Extend with a level of zero (no effect)
+ node = base.clone();
+ node.requireSourceDerivationDepth(0);
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(0, node.getPipeline().size());
+ }
+
+ @Test
+ public void testRequireSourceTimestamp() {
+ final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+ collection,
+ new LinkedList<>(),
+ Sets.newHashSet("x", "y"),
+ Sets.newHashSet("x", "y", "opt"),
+ HashBiMap.create());
+ // Extend with a level greater than zero
+ AggregationPipelineQueryNode node = base.clone();
+ node.requireSourceTimestamp(System.currentTimeMillis());
+ Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+ Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+ Assert.assertEquals(1, node.getPipeline().size());
+ }
+}