You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/11 14:52:02 UTC

[3/6] incubator-rya git commit: RYA-416 A new query node type to represent a MongoDB aggregation pipeline whose results can be converted to binding sets, and tools for optionally transforming some SPARQL expressions into such a node. Closes #254.

RYA-416 A new query node type to represent a MongoDB aggregation pipeline whose results can be converted to binding sets, and tools for optionally transforming some SPARQL expressions into such a node. Closes #254.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d5ebb731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d5ebb731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d5ebb731

Branch: refs/heads/master
Commit: d5ebb7315509edbc212c7481f8abe76ee0eec934
Parents: fff50c4
Author: Jesse Hatfield <je...@parsons.com>
Authored: Thu Dec 21 17:32:47 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Thu Jan 11 09:50:03 2018 -0500

----------------------------------------------------------------------
 dao/mongodb.rya/pom.xml                         |   5 +
 .../AbstractMongoDBRdfConfigurationBuilder.java |  16 +
 .../rya/mongodb/MongoDBRdfConfiguration.java    |  40 +-
 .../AggregationPipelineQueryNode.java           | 856 +++++++++++++++++++
 .../AggregationPipelineQueryOptimizer.java      |  73 ++
 .../aggregation/PipelineResultIteration.java    | 135 +++
 .../SparqlToPipelineTransformVisitor.java       | 196 +++++
 .../dao/SimpleMongoDBStorageStrategy.java       |  21 +-
 .../AggregationPipelineQueryNodeTest.java       | 331 +++++++
 .../mongodb/aggregation/PipelineQueryIT.java    | 421 +++++++++
 .../PipelineResultIterationTest.java            | 152 ++++
 .../SparqlToPipelineTransformVisitorTest.java   | 207 +++++
 12 files changed, 2446 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml
index 0803aa8..0afac81 100644
--- a/dao/mongodb.rya/pom.xml
+++ b/dao/mongodb.rya/pom.xml
@@ -86,5 +86,10 @@ Tests will fail with the following error when using 32bit JVM on either Linux or
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
index bb14a39..369f7a0 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
@@ -43,6 +43,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
     protected static final String DEFAULT_MONGO_PORT = "27017";
     private String mongoCollectionPrefix = "rya_";
     private String mongoDBName = "rya";
+    private boolean usePipeline = false;
 
     protected static final String MONGO_USER = "mongo.user";
     protected static final String MONGO_PASSWORD = "mongo.password";
@@ -142,6 +143,20 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
     }
 
     /**
+     * Enable or disable an optimization that executes queries, to the extent
+     * possible, using the MongoDB aggregation pipeline. Defaults to false.
+     * If true, replaces a query tree or subtree with a single node representing
+     * a series of pipeline steps. Transformation may not be supported for all
+     * query algebra expressions; these expressions are left unchanged and the
+     * optimization is attempted on their child subtrees.
+     * @param usePipeline whether to use aggregation pipeline optimization.
+     */
+    public B setUseAggregationPipeline(boolean usePipeline) {
+        this.usePipeline = usePipeline;
+        return confBuilder();
+    }
+
+    /**
      * @return extension of {@link MongoDBRdfConfiguration} with specified parameters set
      */
     @Override
@@ -171,6 +186,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM
         conf.setTablePrefix(mongoCollectionPrefix);
         conf.setMongoHostname(host);
         conf.setMongoPort(port);
+        conf.setUseAggregationPipeline(usePipeline);
 
         return conf;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index 835ed27..44dc851 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -20,11 +20,14 @@ package org.apache.rya.mongodb;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.mongodb.aggregation.AggregationPipelineQueryOptimizer;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
 
 import edu.umd.cs.findbugs.annotations.Nullable;
 
@@ -51,6 +54,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
     public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist";
 
+    public static final String USE_AGGREGATION_PIPELINE = "rya.mongodb.query.pipeline";
+
     /**
      * Constructs an empty instance of {@link MongoDBRdfConfiguration}.
      */
@@ -251,4 +256,37 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public void setFlush(final boolean flush){
         setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
     }
-}
\ No newline at end of file
+
+    /**
+     * Whether aggregation pipeline optimization is enabled.
+     * @return true if queries will be evaluated using MongoDB aggregation.
+     */
+    public boolean getUseAggregationPipeline() {
+        return getBoolean(USE_AGGREGATION_PIPELINE, false);
+    }
+
+    /**
+     * Enable or disable an optimization that executes queries, to the extent
+     * possible, using the MongoDB aggregation pipeline. Replaces a query tree
+     * or subtree with a single node representing a series of pipeline steps.
+     * Transformation may not be supported for all query algebra expressions;
+     * these expressions are left unchanged and the optimization is attempted
+     * on their child subtrees.
+     * @param value whether to use aggregation pipeline optimization.
+     */
+    public void setUseAggregationPipeline(boolean value) {
+        setBoolean(USE_AGGREGATION_PIPELINE, value);
+    }
+
+    @Override
+    public List<Class<QueryOptimizer>> getOptimizers() {
+        List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+        if (getUseAggregationPipeline()) {
+            Class<?> cl = AggregationPipelineQueryOptimizer.class;
+            @SuppressWarnings("unchecked")
+            Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
+            optimizers.add(optCl);
+        }
+        return optimizers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
new file mode 100644
index 0000000..7a84f5d
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
+import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.mongodb.MongoDbRdfConstants;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
+import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Projections;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Represents a portion of a query tree as MongoDB aggregation pipeline. Should
+ * be built bottom-up: start with a statement pattern implemented as a $match
+ * step, then add steps to the pipeline to handle higher levels of the query
+ * tree. Methods are provided to add certain supported query operations to the
+ * end of the internal pipeline. In some cases, specific arguments may be
+ * unsupported, in which case the pipeline is unchanged and the method returns
+ * false.
+ */
+public class AggregationPipelineQueryNode extends ExternalSet {
+    /**
+     * An aggregation result corresponding to a solution should map this key
+     * to an object which itself maps variable names to variable values.
+     */
+    static final String VALUES = "<VALUES>";
+
+    /**
+     * An aggregation result corresponding to a solution should map this key
+     * to an object which itself maps variable names to the corresponding hashes
+     * of their values.
+     */
+    static final String HASHES = "<HASHES>";
+
+    /**
+     * An aggregation result corresponding to a solution should map this key
+     * to an object which itself maps variable names to their datatypes, if any.
+     */
+    static final String TYPES = "<TYPES>";
+
+    private static final String LEVEL = "derivation_level";
+    private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP };
+
+    private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>";
+    private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>";
+
+    private static final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy();
+
+    private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue());
+    private static final Bson DEFAULT_CONTEXT = new Document("$literal", "");
+    private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV);
+    private static final Bson DEFAULT_METADATA = new Document("$literal",
+            StatementMetadata.EMPTY_METADATA.toString());
+
+    private static boolean isValidFieldName(String name) {
+        return !(name == null || name.contains(".") || name.contains("$")
+                || name.equals("_id"));
+    }
+
+    /**
+     * For a given statement pattern, represents a mapping from query variables
+     * to their corresponding parts of matching triples. If necessary, also
+     * substitute variable names including invalid characters with temporary
+     * replacements, while producing a map back to the original names.
+     */
+    private static class StatementVarMapping {
+        private final Map<String, String> varToTripleValue = new HashMap<>();
+        private final Map<String, String> varToTripleHash = new HashMap<>();
+        private final Map<String, String> varToTripleType = new HashMap<>();
+        private final BiMap<String, String> varToOriginalName;
+
+        String valueField(String varName) {
+            return varToTripleValue.get(varName);
+        }
+        String hashField(String varName) {
+            return varToTripleHash.get(varName);
+        }
+        String typeField(String varName) {
+            return varToTripleType.get(varName);
+        }
+
+        Set<String> varNames() {
+            return varToTripleValue.keySet();
+        }
+
+        private String replace(String original) {
+            if (varToOriginalName.containsValue(original)) {
+                return varToOriginalName.inverse().get(original);
+            }
+            else {
+                String replacement = "field-" + UUID.randomUUID();
+                varToOriginalName.put(replacement, original);
+                return replacement;
+            }
+        }
+
+        private String sanitize(String name) {
+            if (varToOriginalName.containsValue(name)) {
+                return varToOriginalName.inverse().get(name);
+            }
+            else if (name != null && !isValidFieldName(name)) {
+                return replace(name);
+            }
+            return name;
+        }
+
+        StatementVarMapping(StatementPattern sp, BiMap<String, String> varToOriginalName) {
+            this.varToOriginalName = varToOriginalName;
+            if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) {
+                String name = sanitize(sp.getSubjectVar().getName());
+                varToTripleValue.put(name, SUBJECT);
+                varToTripleHash.put(name, SUBJECT_HASH);
+            }
+            if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) {
+                String name = sanitize(sp.getPredicateVar().getName());
+                varToTripleValue.put(name, PREDICATE);
+                varToTripleHash.put(name, PREDICATE_HASH);
+            }
+            if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) {
+                String name = sanitize(sp.getObjectVar().getName());
+                varToTripleValue.put(name, OBJECT);
+                varToTripleHash.put(name, OBJECT_HASH);
+                varToTripleType.put(name, OBJECT_TYPE);
+            }
+            if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) {
+                String name = sanitize(sp.getContextVar().getName());
+                varToTripleValue.put(name, CONTEXT);
+            }
+        }
+
+        Bson getProjectExpression() {
+            return getProjectExpression(new LinkedList<>(), str -> "$" + str);
+        }
+
+        Bson getProjectExpression(Iterable<String> alsoInclude,
+                Function<String, String> getFieldExpr) {
+            Document values = new Document();
+            Document hashes = new Document();
+            Document types = new Document();
+            for (String varName : varNames()) {
+                values.append(varName, getFieldExpr.apply(valueField(varName)));
+                if (varToTripleHash.containsKey(varName)) {
+                    hashes.append(varName, getFieldExpr.apply(hashField(varName)));
+                }
+                if (varToTripleType.containsKey(varName)) {
+                    types.append(varName, getFieldExpr.apply(typeField(varName)));
+                }
+            }
+            for (String varName : alsoInclude) {
+                values.append(varName, 1);
+                hashes.append(varName, 1);
+                types.append(varName, 1);
+            }
+            List<Bson> fields = new LinkedList<>();
+            fields.add(Projections.excludeId());
+            fields.add(Projections.computed(VALUES, values));
+            fields.add(Projections.computed(HASHES, hashes));
+            if (!types.isEmpty()) {
+                fields.add(Projections.computed(TYPES, types));
+            }
+            fields.add(Projections.computed(LEVEL, new Document("$max",
+                    Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 0))));
+            fields.add(Projections.computed(TIMESTAMP, new Document("$max",
+                    Arrays.asList("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0))));
+            return Projections.fields(fields);
+        }
+    }
+
+    /**
+     * Given a StatementPattern, generate an object representing the arguments
+     * to a "$match" command that will find matching triples.
+     * @param sp The StatementPattern to search for
+     * @param path If given, specify the field that should be matched against
+     *  the statement pattern, using an ordered list of field names for a nested
+     *  field. E.g. to match records { "x": { "y": <statement pattern } }, pass
+     *  "x" followed by "y".
+     * @return The argument of a "$match" query
+     */
+    private static BasicDBObject getMatchExpression(StatementPattern sp, String ... path) {
+        final Var subjVar = sp.getSubjectVar();
+        final Var predVar = sp.getPredicateVar();
+        final Var objVar = sp.getObjectVar();
+        final Var contextVar = sp.getContextVar();
+        RyaURI s = null;
+        RyaURI p = null;
+        RyaType o = null;
+        RyaURI c = null;
+        if (subjVar != null && subjVar.getValue() instanceof Resource) {
+            s = RdfToRyaConversions.convertResource((Resource) subjVar.getValue());
+        }
+        if (predVar != null && predVar.getValue() instanceof URI) {
+            p = RdfToRyaConversions.convertURI((URI) predVar.getValue());
+        }
+        if (objVar != null && objVar.getValue() != null) {
+            o = RdfToRyaConversions.convertValue(objVar.getValue());
+        }
+        if (contextVar != null && contextVar.getValue() instanceof URI) {
+            c = RdfToRyaConversions.convertURI((URI) contextVar.getValue());
+        }
+        RyaStatement rs = new RyaStatement(s, p, o, c);
+        DBObject obj = strategy.getQuery(rs);
+        // Add path prefix, if given
+        if (path.length > 0) {
+            StringBuilder sb = new StringBuilder();
+            for (String str : path) {
+                sb.append(str).append(".");
+            }
+            String prefix = sb.toString();
+            Set<String> originalKeys = new HashSet<>(obj.keySet());
+            originalKeys.forEach(key -> {
+                Object value = obj.removeField(key);
+                obj.put(prefix + key, value);
+            });
+        }
+        return (BasicDBObject) obj;
+    }
+
+    private static String valueFieldExpr(String varName) {
+        return "$" + VALUES + "." + varName;
+    }
+    private static String hashFieldExpr(String varName) {
+        return "$" + HASHES + "." + varName;
+    }
+    private static String typeFieldExpr(String varName) {
+        return "$" + TYPES + "." + varName;
+    }
+    private static String joinFieldExpr(String triplePart) {
+        return "$" + JOINED_TRIPLE + "." + triplePart;
+    }
+
+    /**
+     * Get an object representing the value field of some value expression, or
+     * return null if the expression isn't supported.
+     */
+    private Object valueFieldExpr(ValueExpr expr) {
+        if (expr instanceof Var) {
+            return valueFieldExpr(((Var) expr).getName());
+        }
+        else if (expr instanceof ValueConstant) {
+            return new Document("$literal", ((ValueConstant) expr).getValue().stringValue());
+        }
+        else {
+            return null;
+        }
+    }
+
+    private final List<Bson> pipeline;
+    private final MongoCollection<Document> collection;
+    private final Set<String> assuredBindingNames;
+    private final Set<String> bindingNames;
+    private final BiMap<String, String> varToOriginalName;
+
+    private String replace(String original) {
+        if (varToOriginalName.containsValue(original)) {
+            return varToOriginalName.inverse().get(original);
+        }
+        else {
+            String replacement = "field-" + UUID.randomUUID();
+            varToOriginalName.put(replacement, original);
+            return replacement;
+        }
+    }
+
+    /**
+     * Create a pipeline query node based on a StatementPattern.
+     * @param collection The collection of triples to query.
+     * @param baseSP The leaf node in the query tree.
+     */
+    public AggregationPipelineQueryNode(MongoCollection<Document> collection, StatementPattern baseSP) {
+        this.collection = Preconditions.checkNotNull(collection);
+        Preconditions.checkNotNull(baseSP);
+        this.varToOriginalName = HashBiMap.create();
+        StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName);
+        this.assuredBindingNames = new HashSet<>(mapping.varNames());
+        this.bindingNames = new HashSet<>(mapping.varNames());
+        this.pipeline = new LinkedList<>();
+        this.pipeline.add(Aggregates.match(getMatchExpression(baseSP)));
+        this.pipeline.add(Aggregates.project(mapping.getProjectExpression()));
+    }
+
+    AggregationPipelineQueryNode(MongoCollection<Document> collection,
+            List<Bson> pipeline, Set<String> assuredBindingNames,
+            Set<String> bindingNames, BiMap<String, String> varToOriginalName) {
+        this.collection = Preconditions.checkNotNull(collection);
+        this.pipeline = Preconditions.checkNotNull(pipeline);
+        this.assuredBindingNames = Preconditions.checkNotNull(assuredBindingNames);
+        this.bindingNames = Preconditions.checkNotNull(bindingNames);
+        this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof AggregationPipelineQueryNode) {
+            AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o;
+            if (this.collection.equals(other.collection)
+                    && this.assuredBindingNames.equals(other.assuredBindingNames)
+                    && this.bindingNames.equals(other.bindingNames)
+                    && this.varToOriginalName.equals(other.varToOriginalName)
+                    && this.pipeline.size() == other.pipeline.size()) {
+                // Check pipeline steps for equality -- underlying types don't
+                // have well-behaved equals methods, so check for equivalent
+                // string representations.
+                for (int i = 0; i < this.pipeline.size(); i++) {
+                    Bson doc1 = this.pipeline.get(i);
+                    Bson doc2 = other.pipeline.get(i);
+                    if (!doc1.toString().equals(doc2.toString())) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(collection, pipeline, assuredBindingNames,
+                bindingNames, varToOriginalName);
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
+            throws QueryEvaluationException {
+        return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings);
+    }
+
+    @Override
+    public Set<String> getAssuredBindingNames() {
+        Set<String> names = new HashSet<>();
+        for (String name : assuredBindingNames) {
+            names.add(varToOriginalName.getOrDefault(name, name));
+        }
+        return names;
+    }
+
+    @Override
+    public Set<String> getBindingNames() {
+        Set<String> names = new HashSet<>();
+        for (String name : bindingNames) {
+            names.add(varToOriginalName.getOrDefault(name, name));
+        }
+        return names;
+    }
+
+    @Override
+    public AggregationPipelineQueryNode clone() {
+        return new AggregationPipelineQueryNode(collection,
+                new LinkedList<>(pipeline),
+                new HashSet<>(assuredBindingNames),
+                new HashSet<>(bindingNames),
+                HashBiMap.create(varToOriginalName));
+    }
+
+    @Override
+    public String getSignature() {
+        super.getSignature();
+        Set<String> assured = getAssuredBindingNames();
+        Set<String> any = getBindingNames();
+        StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: ");
+        sb.append(String.join(", ", assured));
+        if (any.size() > assured.size()) {
+            Set<String> optionalBindingNames = any;
+            optionalBindingNames.removeAll(assured);
+            sb.append(" [")
+                .append(String.join(", ", optionalBindingNames))
+                .append("]");
+        }
+        sb.append(")\n");
+        for (Bson doc : pipeline) {
+            sb.append(doc.toString()).append("\n");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Get the internal list of aggregation pipeline steps. Note that documents
+     * resulting from this pipeline will be structured using an internal
+     * intermediate representation. For documents representing triples, see
+     * {@link #getTriplePipeline}, and for query solutions, see
+     * {@link #evaluate}.
+     * @return The current internal pipeline.
+     */
+    List<Bson> getPipeline() {
+        return pipeline;
+    }
+
+    /**
+     * Add a join with an individual {@link StatementPattern} to the pipeline.
+     * @param sp The statement pattern to join with
+     * @return true if the join was successfully added to the pipeline.
+     */
+    public boolean joinWith(StatementPattern sp) {
+        Preconditions.checkNotNull(sp);
+        // 1. Determine shared variables and new variables
+        StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName);
+        NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames());
+        sharedVars.retainAll(assuredBindingNames);
+        // 2. Join on one shared variable
+        String joinKey =  sharedVars.pollFirst();
+        String collectionName = collection.getNamespace().getCollectionName();
+        Bson join;
+        if (joinKey == null) {
+            return false;
+        }
+        else {
+            join = Aggregates.lookup(collectionName,
+                    HASHES + "." + joinKey,
+                    spMap.hashField(joinKey),
+                    JOINED_TRIPLE);
+        }
+        pipeline.add(join);
+        // 3. Unwind the joined triples so each document represents a binding
+        //   set (solution) from the base branch and a triple that may match.
+        pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE));
+        // 4. (Optional) If there are any shared variables that weren't used as
+        //   the join key, project all existing fields plus a new field that
+        //   tests the equality of those shared variables.
+        BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE);
+        if (!sharedVars.isEmpty()) {
+            List<Bson> eqTests = new LinkedList<>();
+            for (String varName : sharedVars) {
+                String oldField = valueFieldExpr(varName);
+                String newField = joinFieldExpr(spMap.valueField(varName));
+                Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField));
+                eqTests.add(eqTest);
+            }
+            Bson eqProjectOpts = Projections.fields(
+                    Projections.computed(FIELDS_MATCH, Filters.and(eqTests)),
+                    Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP));
+            pipeline.add(Aggregates.project(eqProjectOpts));
+            matchOpts.put(FIELDS_MATCH, true);
+        }
+        // 5. Filter for solutions whose triples match the joined statement
+        //  pattern, and, if applicable, whose additional shared variables
+        //  match the current solution.
+        pipeline.add(Aggregates.match(matchOpts));
+        // 6. Project the results to include variables from the new SP (with
+        // appropriate renaming) and variables referenced only in the base
+        // pipeline (with previous names).
+        Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName)
+                .getProjectExpression(assuredBindingNames,
+                        str -> joinFieldExpr(str));
+        assuredBindingNames.addAll(spMap.varNames());
+        bindingNames.addAll(spMap.varNames());
+        pipeline.add(Aggregates.project(finalProjectOpts));
+        return true;
+    }
+
+    /**
+     * Add a SPARQL projection or multi-projection operation to the pipeline.
+     * The number of documents produced by the pipeline after this operation
+     * will be the number of documents entering this stage (the number of
+     * intermediate results) multiplied by the number of
+     * {@link ProjectionElemList}s supplied here.
+     * @param projections One or more projections, i.e. mappings from the result
+     *  at this stage of the query into a set of variables.
+     * @return true if the projection(s) were added to the pipeline.
+     */
+    public boolean project(Iterable<ProjectionElemList> projections) {
+        if (projections == null || !projections.iterator().hasNext()) {
+            return false;
+        }
+        List<Bson> projectOpts = new LinkedList<>();
+        Set<String> bindingNamesUnion = new HashSet<>();
+        Set<String> bindingNamesIntersection = null;
+        for (ProjectionElemList projection : projections) {
+            Document valueDoc = new Document();
+            Document hashDoc = new Document();
+            Document typeDoc = new Document();
+            Set<String> projectionBindingNames = new HashSet<>();
+            for (ProjectionElem elem : projection.getElements()) {
+                String to = elem.getTargetName();
+                // If the 'to' name is invalid, replace it internally
+                if (!isValidFieldName(to)) {
+                    to = replace(to);
+                }
+                String from = elem.getSourceName();
+                // If the 'from' name is invalid, use the internal substitute
+                if (varToOriginalName.containsValue(from)) {
+                    from = varToOriginalName.inverse().get(from);
+                }
+                projectionBindingNames.add(to);
+                if (to.equals(from)) {
+                    valueDoc.append(to, 1);
+                    hashDoc.append(to, 1);
+                    typeDoc.append(to, 1);
+                }
+                else {
+                    valueDoc.append(to, valueFieldExpr(from));
+                    hashDoc.append(to, hashFieldExpr(from));
+                    typeDoc.append(to, typeFieldExpr(from));
+                }
+            }
+            bindingNamesUnion.addAll(projectionBindingNames);
+            if (bindingNamesIntersection == null) {
+                bindingNamesIntersection = new HashSet<>(projectionBindingNames);
+            }
+            else {
+                bindingNamesIntersection.retainAll(projectionBindingNames);
+            }
+            projectOpts.add(new Document()
+                    .append(VALUES, valueDoc)
+                    .append(HASHES, hashDoc)
+                    .append(TYPES, typeDoc)
+                    .append(LEVEL, "$" + LEVEL)
+                    .append(TIMESTAMP, "$" + TIMESTAMP));
+        }
+        if (projectOpts.size() == 1) {
+            pipeline.add(Aggregates.project(projectOpts.get(0)));
+        }
+        else {
+            String listKey = "PROJECTIONS";
+            Bson projectIndividual = Projections.fields(
+                    Projections.computed(VALUES, "$" + listKey + "." + VALUES),
+                    Projections.computed(HASHES, "$" + listKey + "." + HASHES),
+                    Projections.computed(TYPES, "$" + listKey + "." + TYPES),
+                    Projections.include(LEVEL),
+                    Projections.include(TIMESTAMP));
+            pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts)));
+            pipeline.add(Aggregates.unwind("$" + listKey));
+            pipeline.add(Aggregates.project(projectIndividual));
+        }
+        assuredBindingNames.clear();
+        bindingNames.clear();
+        assuredBindingNames.addAll(bindingNamesIntersection);
+        bindingNames.addAll(bindingNamesUnion);
+        return true;
+    }
+
+    /**
+     * Add a SPARQL extension to the pipeline, if possible. An extension adds
+     * some number of variables to the result. Adds a "$project" step to the
+     * pipeline, but differs from the SPARQL project operation in that
+     * 1) pre-existing variables are always kept, and 2) values of new variables
+     * are defined by expressions, which may be more complex than simply
+     * variable names. Not all expressions are supported. If unsupported
+     * expression types are used in the extension, the pipeline will remain
+     * unchanged and this method will return false.
+     * @param extensionElements A list of new variables and their expressions
+     * @return True if the extension was successfully converted into a pipeline
+     *  step, false otherwise.
+     */
+    public boolean extend(Iterable<ExtensionElem> extensionElements) {
+        List<Bson> valueFields = new LinkedList<>();
+        List<Bson> hashFields = new LinkedList<>();
+        List<Bson> typeFields = new LinkedList<>();
+        for (String varName : bindingNames) {
+            valueFields.add(Projections.include(varName));
+            hashFields.add(Projections.include(varName));
+            typeFields.add(Projections.include(varName));
+        }
+        Set<String> newVarNames = new HashSet<>();
+        for (ExtensionElem elem : extensionElements) {
+            String name = elem.getName();
+            if (!isValidFieldName(name)) {
+                // If the field name is invalid, replace it internally
+                name = replace(name);
+            }
+            // We can only handle certain kinds of value expressions; return
+            // failure for any others.
+            ValueExpr expr = elem.getExpr();
+            final Object valueField;
+            final Object hashField;
+            final Object typeField;
+            if (expr instanceof Var) {
+                String varName = ((Var) expr).getName();
+                valueField = "$" + varName;
+                hashField = "$" + varName;
+                typeField = "$" + varName;
+            }
+            else if (expr instanceof ValueConstant) {
+                Value val = ((ValueConstant) expr).getValue();
+                valueField = new Document("$literal", val.stringValue());
+                hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue()));
+                if (val instanceof Literal) {
+                    typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue());
+                }
+                else {
+                    typeField = null;
+                }
+            }
+            else {
+                // if not understood, return failure
+                return false;
+            }
+            valueFields.add(Projections.computed(name, valueField));
+            hashFields.add(Projections.computed(name, hashField));
+            if (typeField != null) {
+                typeFields.add(Projections.computed(name, typeField));
+            }
+            newVarNames.add(name);
+        }
+        assuredBindingNames.addAll(newVarNames);
+        bindingNames.addAll(newVarNames);
+        Bson projectOpts = Projections.fields(
+                Projections.computed(VALUES, Projections.fields(valueFields)),
+                Projections.computed(HASHES, Projections.fields(hashFields)),
+                Projections.computed(TYPES, Projections.fields(typeFields)),
+                Projections.include(LEVEL),
+                Projections.include(TIMESTAMP));
+        pipeline.add(Aggregates.project(projectOpts));
+        return true;
+    }
+
+    /**
+     * Add a SPARQL filter to the pipeline, if possible. A filter eliminates
+     * results that don't satisfy a given condition. Not all conditional
+     * expressions are supported. If unsupported expressions are used in the
+     * filter, the pipeline will remain unchanged and this method will return
+     * false. Currently only supports binary {@link Compare} conditions among
+     * variables and/or literals.
+     * @param condition The filter condition
+     * @return True if the filter was successfully converted into a pipeline
+     *  step, false otherwise.
+     */
+    public boolean filter(ValueExpr condition) {
+        if (condition instanceof Compare) {
+            Compare compare = (Compare) condition;
+            Compare.CompareOp operator = compare.getOperator();
+            Object leftArg = valueFieldExpr(compare.getLeftArg());
+            Object rightArg = valueFieldExpr(compare.getRightArg());
+            if (leftArg == null || rightArg == null) {
+                // unsupported value expression, can't convert filter
+                return false;
+            }
+            final String opFunc;
+            switch (operator) {
+            case EQ:
+                opFunc = "$eq";
+                break;
+            case NE:
+                opFunc = "$ne";
+                break;
+            case LT:
+                opFunc = "$lt";
+                break;
+            case LE:
+                opFunc = "$le";
+                break;
+            case GT:
+                opFunc = "$gt";
+                break;
+            case GE:
+                opFunc = "$ge";
+                break;
+            default:
+                // unrecognized comparison operator, can't convert filter
+                return false;
+            }
+            Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg));
+            pipeline.add(Aggregates.project(Projections.fields(
+                    Projections.computed("FILTER", compareDoc),
+                    Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
+            pipeline.add(Aggregates.match(new Document("FILTER", true)));
+            pipeline.add(Aggregates.project(Projections.fields(
+                    Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Add a $group step to filter out redundant solutions.
+     * @return True if the distinct operation was successfully appended.
+     */
+    public boolean distinct() {
+        List<String> key = new LinkedList<>();
+        for (String varName : bindingNames) {
+            key.add(hashFieldExpr(varName));
+        }
+        List<BsonField> reduceOps = new LinkedList<>();
+        for (String field : FIELDS) {
+            reduceOps.add(new BsonField(field, new Document("$first", "$" + field)));
+        }
+        pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps));
+        return true;
+    }
+
+    /**
+     * Add a step to the end of the current pipeline which prunes the results
+     * according to the recorded derivation level of their sources. At least one
+     * triple that was used to construct the result must have a derivation level
+     * at least as high as the parameter, indicating that it was derived via
+     * that many steps from the original data. (A value of zero is equivalent to
+     * input data that was not derived at all.) Use in conjunction with
+     * getTriplePipeline (which sets source level for generated triples) to
+     * avoid repeatedly deriving the same results.
+     * @param requiredLevel Required derivation depth. Reject a solution to the
+     *  query if all of the triples involved in producing that solution have a
+     *  lower derivation depth than this. If zero, does nothing.
+     */
+    public void requireSourceDerivationDepth(int requiredLevel) {
+        if (requiredLevel > 0) {
+            pipeline.add(Aggregates.match(new Document(LEVEL,
+                    new Document("$gte", requiredLevel))));
+        }
+    }
+
+    /**
+     * Add a step to the end of the current pipeline which prunes the results
+     * according to the timestamps of their sources. At least one triple that
+     * was used to construct the result must have a timestamp at least as
+     * recent as the parameter. Use in iterative applications to avoid deriving
+     * solutions that would have been generated in an earlier iteration.
+     * @param t Minimum required timestamp. Reject a solution to the query if
+     *  all of the triples involved in producing that solution have an earlier
+     *  timestamp than this.
+     */
+    public void requireSourceTimestamp(long t) {
+        pipeline.add(Aggregates.match(new Document(TIMESTAMP,
+                new Document("$gte", t))));
+    }
+
+    /**
+     * Given that the current state of the pipeline produces data that can be
+     * interpreted as triples, add a project step to map each result from the
+     * intermediate result structure to a structure that can be stored in the
+     * triple store. Does not modify the internal pipeline, which will still
+     * produce intermediate results suitable for query evaluation.
+     * @param timestamp Attach this timestamp to the resulting triples.
+     * @param requireNew If true, add an additional step to check constructed
+     *  triples against existing triples and only include new ones in the
+     *  result. Adds a potentially expensive $lookup step.
+     * @throws IllegalStateException if the results produced by the current
+     *  pipeline do not have variable names allowing them to be interpreted as
+     *  triples (i.e. "subject", "predicate", and "object").
+     */
+    public List<Bson> getTriplePipeline(long timestamp, boolean requireNew) {
+        if (!assuredBindingNames.contains(SUBJECT)
+                || !assuredBindingNames.contains(PREDICATE)
+                || !assuredBindingNames.contains(OBJECT)) {
+            throw new IllegalStateException("Current pipeline does not produce "
+                    + "records that can be converted into triples.\n"
+                    + "Required variable names: <" + SUBJECT + ", " + PREDICATE
+                    + ", " + OBJECT + ">\nCurrent variable names: "
+                    + assuredBindingNames);
+        }
+        List<Bson> triplePipeline = new LinkedList<>(pipeline);
+        List<Bson> fields = new LinkedList<>();
+        fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT)));
+        fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT)));
+        fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE)));
+        fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE)));
+        fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT)));
+        fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT)));
+        fields.add(Projections.computed(OBJECT_TYPE,
+                ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE)));
+        fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT));
+        fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA));
+        fields.add(DEFAULT_DV);
+        fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp)));
+        fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1))));
+        triplePipeline.add(Aggregates.project(Projections.fields(fields)));
+        if (requireNew) {
+            // Prune any triples that already exist in the data store
+            String collectionName = collection.getNamespace().getCollectionName();
+            Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH,
+                    PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH,
+                    OBJECT_TYPE, CONTEXT, STATEMENT_METADATA,
+                    DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL);
+            List<Bson> eqTests = new LinkedList<>();
+            eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH)));
+            eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH)));
+            Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE)
+                    .append("as", "this").append("cond", new Document("$and", eqTests)));
+            triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH,
+                    SUBJECT_HASH, JOINED_TRIPLE));
+            String numRedundant = "REDUNDANT";
+            triplePipeline.add(Aggregates.project(Projections.fields(includeAll,
+                    Projections.computed(numRedundant, new Document("$size", redundantFilter)))));
+            triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0)));
+            triplePipeline.add(Aggregates.project(Projections.fields(includeAll)));
+        }
+        return triplePipeline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
new file mode 100644
index 0000000..fb1f558
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * MongoDB-specific query optimizer that replaces part or all of a SPARQL query
+ * tree with a MongoDB aggregation pipeline.
+ * <p>
+ * Transforms query trees using {@link SparqlToPipelineTransformVisitor}. If
+ * possible, this visitor will replace portions of the query tree, or the entire
+ * query, with an equivalent aggregation pipeline (contained in an
+ * {@link AggregationPipelineQueryNode}), thereby allowing query logic to be
+ * evaluated by the MongoDB server rather than by the client.
+ */
+public class AggregationPipelineQueryOptimizer implements QueryOptimizer, Configurable {
+    private StatefulMongoDBRdfConfiguration conf;
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
+        SparqlToPipelineTransformVisitor pipelineVisitor = new SparqlToPipelineTransformVisitor(conf);
+        try {
+            tupleExpr.visit(pipelineVisitor);
+        } catch (Exception e) {
+            logger.error("Error attempting to transform query using the aggregation pipeline", e);
+        }
+    }
+
+    /**
+     * @throws IllegalArgumentException if conf is not a {@link StatefulMongoDBRdfConfiguration}.
+     */
+    @Override
+    public void setConf(Configuration conf) {
+        Preconditions.checkNotNull(conf);
+        Preconditions.checkArgument(conf instanceof StatefulMongoDBRdfConfiguration,
+                "Expected an instance of %s; received %s",
+                StatefulMongoDBRdfConfiguration.class.getName(), conf.getClass().getName());
+        this.conf = (StatefulMongoDBRdfConfiguration) conf;
+    }
+
+    @Override
+    public StatefulMongoDBRdfConfiguration getConf() {
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
new file mode 100644
index 0000000..c533efc
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Map;
+
+import org.bson.Document;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.MongoCursor;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * An iterator that converts the documents resulting from an
+ * {@link AggregationPipelineQueryNode} into {@link BindingSet}s.
+ */
+public class PipelineResultIteration implements CloseableIteration<BindingSet, QueryEvaluationException> {
+    private static final int BATCH_SIZE = 1000;
+    private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+    private final MongoCursor<Document> cursor;
+    private final Map<String, String> varToOriginalName;
+    private final BindingSet bindings;
+    private BindingSet nextSolution = null;
+
+    /**
+     * Constructor.
+     * @param aggIter Iterator of documents in AggregationPipelineQueryNode's
+     *  intermediate solution representation.
+     * @param varToOriginalName A mapping from field names in the pipeline
+     *  result documents to equivalent variable names in the original query.
+     *  Where an entry does not exist for a field, the field name and variable
+     *  name are assumed to be the same.
+     * @param bindings A partial solution. May be empty.
+     */
+    public PipelineResultIteration(AggregateIterable<Document> aggIter,
+            Map<String, String> varToOriginalName,
+            BindingSet bindings) {
+        this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName);
+        this.bindings = Preconditions.checkNotNull(bindings);
+        Preconditions.checkNotNull(aggIter);
+        aggIter.batchSize(BATCH_SIZE);
+        this.cursor = aggIter.iterator();
+    }
+
+    private void lookahead() {
+        while (nextSolution == null && cursor.hasNext()) {
+            nextSolution = docToBindingSet(cursor.next());
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws QueryEvaluationException {
+        lookahead();
+        return nextSolution != null;
+    }
+
+    @Override
+    public BindingSet next() throws QueryEvaluationException {
+        lookahead();
+        BindingSet solution = nextSolution;
+        nextSolution = null;
+        return solution;
+    }
+
+    /**
+     * @throws UnsupportedOperationException always.
+     */
+    @Override
+    public void remove() throws QueryEvaluationException {
+        throw new UnsupportedOperationException("remove() undefined for query result iteration");
+    }
+
+    @Override
+    public void close() throws QueryEvaluationException {
+        cursor.close();
+    }
+
+    private QueryBindingSet docToBindingSet(Document result) {
+        QueryBindingSet bindingSet = new QueryBindingSet(bindings);
+        Document valueSet = result.get(AggregationPipelineQueryNode.VALUES, Document.class);
+        Document typeSet = result.get(AggregationPipelineQueryNode.TYPES, Document.class);
+        if (valueSet != null) {
+            for (Map.Entry<String, Object> entry : valueSet.entrySet()) {
+                String fieldName = entry.getKey();
+                String valueString = entry.getValue().toString();
+                String typeString = typeSet == null ? null : typeSet.getString(fieldName);
+                String varName = varToOriginalName.getOrDefault(fieldName, fieldName);
+                Value varValue;
+                if (typeString == null || typeString.equals(XMLSchema.ANYURI.stringValue())) {
+                    varValue = VF.createURI(valueString);
+                }
+                else {
+                    varValue = VF.createLiteral(valueString, VF.createURI(typeString));
+                }
+                Binding existingBinding = bindingSet.getBinding(varName);
+                // If this variable is not already bound, add it.
+                if (existingBinding == null) {
+                    bindingSet.addBinding(varName, varValue);
+                }
+                // If it's bound to something else, the solutions are incompatible.
+                else if (!existingBinding.getValue().equals(varValue)) {
+                    return null;
+                }
+            }
+        }
+        return bindingSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
new file mode 100644
index 0000000..b7f5a67
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.bson.Document;
+import org.openrdf.query.algebra.Distinct;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Visitor that transforms a SPARQL query tree by replacing as much of the tree
+ * as possible with one or more {@code AggregationPipelineQueryNode}s.
+ * <p>
+ * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation
+ * pipeline which is equivalent to the replaced portion of the original query.
+ * Evaluating this node executes the pipeline and converts the results into
+ * query solutions. If only part of the query was transformed, the remaining
+ * query logic (higher up in the query tree) can be applied to those
+ * intermediate solutions as normal.
+ * <p>
+ * In general, processes the tree in bottom-up order: A leaf node
+ * ({@link StatementPattern}) is replaced with a pipeline that matches the
+ * corresponding statements. Then, if the parent node's semantics are supported
+ * by the visitor, stages are appended to the pipeline and the subtree at the
+ * parent node is replaced with the extended pipeline. This continues up the
+ * tree until reaching a node that cannot be transformed, in which case that
+ * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf
+ * node) instead of the previous subtree, or until the entire tree has been
+ * subsumed into a single pipeline node.
+ * <p>
+ * Nodes which are transformed into pipeline stages:
+ * <p><ul>
+ * <li>A {@code StatementPattern} node forms the beginning of each pipeline.
+ * <li>Single-argument operations {@link Projection}, {@link MultiProjection},
+ * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed
+ * into pipeline stages whenever the child {@link TupleExpr} represents a
+ * pipeline.
+ * <li>A {@link Filter} operation will be appended to the pipeline when its
+ * child {@code TupleExpr} represents a pipeline and the filter condition is a
+ * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}.
+ * <li>A {@link Join} operation will be appended to the pipeline when one child
+ * is a {@code StatementPattern} and the other is an
+ * {@code AggregationPipelineQueryNode}.
+ * </ul>
+ */
+public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase<Exception> {
+    private final MongoCollection<Document> inputCollection;
+
+    /**
+     * Instantiate a visitor directly from a {@link MongoCollection}.
+     * @param inputCollection Stores triples.
+     */
+    public SparqlToPipelineTransformVisitor(MongoCollection<Document> inputCollection) {
+        this.inputCollection = Preconditions.checkNotNull(inputCollection);
+    }
+
+    /**
+     * Instantiate a visitor from a {@link MongoDBRdfConfiguration}.
+     * @param conf Contains database connection information.
+     */
+    public SparqlToPipelineTransformVisitor(StatefulMongoDBRdfConfiguration conf) {
+        Preconditions.checkNotNull(conf);
+        MongoClient mongo = conf.getMongoClient();
+        MongoDatabase db = mongo.getDatabase(conf.getMongoDBName());
+        this.inputCollection = db.getCollection(conf.getTriplesCollectionName());
+    }
+
+    @Override
+    public void meet(StatementPattern sp) {
+        sp.replaceWith(new AggregationPipelineQueryNode(inputCollection, sp));
+    }
+
+    @Override
+    public void meet(Join join) throws Exception {
+        // If one branch is a single statement pattern, then try replacing the
+        // other with a pipeline.
+        AggregationPipelineQueryNode pipelineNode = null;
+        StatementPattern joinWithSP = null;
+        if (join.getRightArg() instanceof StatementPattern) {
+            join.getLeftArg().visit(this);
+            if (join.getLeftArg() instanceof AggregationPipelineQueryNode) {
+                pipelineNode = (AggregationPipelineQueryNode) join.getLeftArg();
+                joinWithSP = (StatementPattern) join.getRightArg();
+            }
+        }
+        else if (join.getLeftArg() instanceof StatementPattern) {
+            join.getRightArg().visit(this);
+            if (join.getRightArg() instanceof AggregationPipelineQueryNode) {
+                pipelineNode = (AggregationPipelineQueryNode) join.getRightArg();
+                joinWithSP = (StatementPattern) join.getLeftArg();
+            }
+        }
+        else {
+            // Otherwise, visit the children to try to replace smaller subtrees
+            join.visitChildren(this);
+        }
+        // If this is now a join between a pipeline node and a statement
+        // pattern, add the join step at the end of the pipeline, and replace
+        // this node with the extended pipeline node.
+        if (pipelineNode != null && joinWithSP != null && pipelineNode.joinWith(joinWithSP)) {
+            join.replaceWith(pipelineNode);
+        }
+    }
+
+    @Override
+    public void meet(Projection projectionNode) throws Exception {
+        projectionNode.visitChildren(this);
+        if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) {
+            AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg();
+            if (pipelineNode.project(Arrays.asList(projectionNode.getProjectionElemList()))) {
+                projectionNode.replaceWith(pipelineNode);
+            }
+        }
+    }
+
+    @Override
+    public void meet(MultiProjection projectionNode) throws Exception {
+        projectionNode.visitChildren(this);
+        if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) {
+            AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg();
+            if (pipelineNode.project(projectionNode.getProjections())) {
+                projectionNode.replaceWith(pipelineNode);
+            }
+        }
+    }
+
+    @Override
+    public void meet(Extension extensionNode) throws Exception {
+        extensionNode.visitChildren(this);
+        if (extensionNode.getArg() instanceof AggregationPipelineQueryNode && extensionNode.getParentNode() != null) {
+            AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) extensionNode.getArg();
+            if (pipelineNode.extend(extensionNode.getElements())) {
+                extensionNode.replaceWith(pipelineNode);
+            }
+        }
+    }
+
+    @Override
+    public void meet(Reduced reducedNode) throws Exception {
+        reducedNode.visitChildren(this);
+        if (reducedNode.getArg() instanceof AggregationPipelineQueryNode && reducedNode.getParentNode() != null) {
+            reducedNode.replaceWith(reducedNode.getArg());
+        }
+    }
+
+    @Override
+    public void meet(Distinct distinctNode) throws Exception {
+        distinctNode.visitChildren(this);
+        if (distinctNode.getArg() instanceof AggregationPipelineQueryNode && distinctNode.getParentNode() != null) {
+            AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) distinctNode.getArg();
+            pipelineNode.distinct();
+            distinctNode.replaceWith(pipelineNode);
+        }
+    }
+
+    @Override
+    public void meet(Filter filterNode) throws Exception {
+        filterNode.visitChildren(this);
+        if (filterNode.getArg() instanceof AggregationPipelineQueryNode && filterNode.getParentNode() != null) {
+            AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) filterNode.getArg();
+            if (pipelineNode.filter(filterNode.getCondition())) {
+                filterNode.replaceWith(pipelineNode);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
index db33181..ecad9c6 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
@@ -63,6 +63,15 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
     public static final String STATEMENT_METADATA = "statementMetadata";
     public static final String DOCUMENT_VISIBILITY = "documentVisibility";
 
+    /**
+     * Generate the hash that will be used to index and retrieve a given value.
+     * @param value  A value to be stored or accessed (e.g. a URI or literal).
+     * @return the hash associated with that value in MongoDB.
+     */
+    public static String hash(String value) {
+        return DigestUtils.sha256Hex(value);
+    }
+
     protected ValueFactoryImpl factory = new ValueFactoryImpl();
 
     @Override
@@ -91,14 +100,14 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
         final RyaURI context = stmt.getContext();
         final BasicDBObject query = new BasicDBObject();
         if (subject != null){
-            query.append(SUBJECT_HASH, DigestUtils.sha256Hex(subject.getData()));
+            query.append(SUBJECT_HASH, hash(subject.getData()));
         }
         if (object != null){
-            query.append(OBJECT_HASH, DigestUtils.sha256Hex(object.getData()));
+            query.append(OBJECT_HASH, hash(object.getData()));
             query.append(OBJECT_TYPE, object.getDataType().toString());
         }
         if (predicate != null){
-            query.append(PREDICATE_HASH, DigestUtils.sha256Hex(predicate.getData()));
+            query.append(PREDICATE_HASH, hash(predicate.getData()));
         }
         if (context != null){
             query.append(CONTEXT, context.getData());
@@ -179,11 +188,11 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS
         final BasicDBObject dvObject = DocumentVisibilityAdapter.toDBObject(statement.getColumnVisibility());
         final BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes)))
         .append(SUBJECT, statement.getSubject().getData())
-        .append(SUBJECT_HASH, DigestUtils.sha256Hex(statement.getSubject().getData()))
+        .append(SUBJECT_HASH, hash(statement.getSubject().getData()))
         .append(PREDICATE, statement.getPredicate().getData())
-        .append(PREDICATE_HASH, DigestUtils.sha256Hex(statement.getPredicate().getData()))
+        .append(PREDICATE_HASH, hash(statement.getPredicate().getData()))
         .append(OBJECT, statement.getObject().getData())
-        .append(OBJECT_HASH, DigestUtils.sha256Hex(statement.getObject().getData()))
+        .append(OBJECT_HASH, hash(statement.getObject().getData()))
         .append(OBJECT_TYPE, statement.getObject().getDataType().toString())
         .append(CONTEXT, context)
         .append(STATEMENT_METADATA, statement.getMetadata().toString())

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
new file mode 100644
index 0000000..1e056c4
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.IsLiteral;
+import org.openrdf.query.algebra.Not;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Sets;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoCollection;
+
+public class AggregationPipelineQueryNodeTest {
+    private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+    private static final String LUBM = "urn:lubm";
+    private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent");
+    private static final URI TAKES = VF.createURI(LUBM, "takesCourse");
+
+    private static Var constant(URI value) {
+        return new Var(value.stringValue(), value);
+    }
+
+    private MongoCollection<Document> collection;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() {
+        collection = Mockito.mock(MongoCollection.class);
+        Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection"));
+    }
+
+    @Test
+    public void testEquals() {
+        final AggregationPipelineQueryNode node1 = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        final AggregationPipelineQueryNode node2 = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        Assert.assertEquals(node1, node2);
+        Assert.assertEquals(node1.hashCode(), node2.hashCode());
+        final AggregationPipelineQueryNode diff1 = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y"),
+                HashBiMap.create());
+        final AggregationPipelineQueryNode diff2 = new AggregationPipelineQueryNode(
+                collection,
+                Arrays.asList(new Document()),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        HashBiMap<String, String> varMapping = HashBiMap.create();
+        varMapping.put("field-x", "x");
+        final AggregationPipelineQueryNode diff3 = new AggregationPipelineQueryNode(
+                collection,
+                Arrays.asList(new Document()),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                varMapping);
+        Assert.assertNotEquals(diff1, node1);
+        Assert.assertNotEquals(diff2, node1);
+        Assert.assertNotEquals(diff3, node1);
+        node1.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+        node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+        Assert.assertEquals(node1, node2);
+        node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+        Assert.assertNotEquals(node1, node2);
+    }
+
+    @Test
+    public void testClone() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        final AggregationPipelineQueryNode copy = base.clone();
+        Assert.assertEquals(base, copy);
+        copy.getPipeline().add(new Document("$project", new Document()));
+        Assert.assertNotEquals(base, copy);
+        base.getPipeline().add(new Document("$project", new Document()));
+        Assert.assertEquals(base, copy);
+    }
+
+    @Test
+    public void testStatementPattern() throws Exception {
+        // All variables
+        StatementPattern sp = new StatementPattern(new Var("s"), new Var("p"), new Var("o"));
+        AggregationPipelineQueryNode node = new AggregationPipelineQueryNode(collection, sp);
+        Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getAssuredBindingNames());
+        Assert.assertEquals(2, node.getPipeline().size());
+        // All constants
+        sp = new StatementPattern(constant(VF.createURI("urn:Alice")), constant(RDF.TYPE), constant(UNDERGRAD));
+        node = new AggregationPipelineQueryNode(collection, sp);
+        Assert.assertEquals(Sets.newHashSet(), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet(), node.getAssuredBindingNames());
+        Assert.assertEquals(2, node.getPipeline().size());
+        // Mixture
+        sp = new StatementPattern(new Var("student"), constant(RDF.TYPE), constant(UNDERGRAD));
+        node = new AggregationPipelineQueryNode(collection, sp);
+        Assert.assertEquals(Sets.newHashSet("student"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("student"), node.getAssuredBindingNames());
+        Assert.assertEquals(2, node.getPipeline().size());
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Join on one shared variable
+        AggregationPipelineQueryNode node = base.clone();
+        boolean success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c")));
+        Assert.assertTrue(success);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "c", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y", "c"), node.getAssuredBindingNames());
+        Assert.assertEquals(4, node.getPipeline().size());
+        // Join on multiple shared variables
+        node = base.clone();
+        success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("y")));
+        Assert.assertTrue(success);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(5, node.getPipeline().size());
+    }
+
+    @Test
+    public void testProject() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Add a single projection
+        ProjectionElemList singleProjection = new ProjectionElemList();
+        singleProjection.addElement(new ProjectionElem("x", "z"));
+        singleProjection.addElement(new ProjectionElem("y", "y"));
+        List<ProjectionElemList> projections = Arrays.asList(singleProjection);
+        AggregationPipelineQueryNode node = base.clone();
+        boolean success = node.project(projections);
+        Assert.assertTrue(success);
+        Assert.assertEquals(1, node.getPipeline().size());
+        Assert.assertEquals(Sets.newHashSet("z", "y"),
+                node.getAssuredBindingNames());
+        Assert.assertEquals(Sets.newHashSet("z", "y"),
+                node.getBindingNames());
+        // Add a multi-projection
+        ProjectionElemList p1 = new ProjectionElemList();
+        p1.addElement(new ProjectionElem("x", "solution"));
+        ProjectionElemList p2 = new ProjectionElemList();
+        p2.addElement(new ProjectionElem("y", "solution"));
+        ProjectionElemList p3 = new ProjectionElemList();
+        p3.addElement(new ProjectionElem("x", "x"));
+        p3.addElement(new ProjectionElem("x", "solution"));
+        p3.addElement(new ProjectionElem("y", "y"));
+        projections = Arrays.asList(p1, p2, p3);
+        node = base.clone();
+        success = node.project(projections);
+        Assert.assertTrue(success);
+        Assert.assertEquals(3, node.getPipeline().size());
+        Assert.assertEquals(Sets.newHashSet("solution"),
+                node.getAssuredBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y", "solution"),
+                node.getBindingNames());
+        // Add no projections
+        node = base.clone();
+        success = node.project(Arrays.asList());
+        Assert.assertFalse(success);
+        Assert.assertEquals(base, node);
+    }
+
+    @Test
+    public void testExtend() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Extend with a mix of variables and constants
+        List<ExtensionElem> extensionElements = Arrays.asList(
+                new ExtensionElem(new Var("x"), "subject"),
+                new ExtensionElem(new ValueConstant(RDF.TYPE), "predicate"),
+                new ExtensionElem(new Var("y"), "object"));
+        AggregationPipelineQueryNode node = base.clone();
+        boolean success = node.extend(extensionElements);
+        Assert.assertTrue(success);
+        Assert.assertEquals(1, node.getPipeline().size());
+        Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object"),
+                node.getAssuredBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object", "opt"),
+                node.getBindingNames());
+        // Attempt to extend with an unsupported expression
+        extensionElements = Arrays.asList(
+                new ExtensionElem(new Var("x"), "subject"),
+                new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue"));
+        node = base.clone();
+        success = node.extend(extensionElements);
+        Assert.assertFalse(success);
+        Assert.assertEquals(base, node);
+    }
+
+    @Test
+    public void testDistinct() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        AggregationPipelineQueryNode node = base.clone();
+        boolean success = node.distinct();
+        Assert.assertTrue(success);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(1, node.getPipeline().size());
+    }
+
+    @Test
+    public void testFilter() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Extend with a supported filter
+        AggregationPipelineQueryNode node = base.clone();
+        boolean success = node.filter(new Compare(new Var("x"), new Var("y"), Compare.CompareOp.EQ));
+        Assert.assertTrue(success);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(3, node.getPipeline().size());
+        // Extend with an unsupported filter
+        node = base.clone();
+        success = node.filter(new IsLiteral(new Var("opt")));
+        Assert.assertFalse(success);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(0, node.getPipeline().size());
+    }
+
+    @Test
+    public void testRequireSourceDerivationLevel() throws Exception {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Extend with a level greater than zero
+        AggregationPipelineQueryNode node = base.clone();
+        node.requireSourceDerivationDepth(3);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(1, node.getPipeline().size());
+        // Extend with a level of zero (no effect)
+        node = base.clone();
+        node.requireSourceDerivationDepth(0);
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(0, node.getPipeline().size());
+    }
+
+    @Test
+    public void testRequireSourceTimestamp() {
+        final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode(
+                collection,
+                new LinkedList<>(),
+                Sets.newHashSet("x", "y"),
+                Sets.newHashSet("x", "y", "opt"),
+                HashBiMap.create());
+        // Extend with a level greater than zero
+        AggregationPipelineQueryNode node = base.clone();
+        node.requireSourceTimestamp(System.currentTimeMillis());
+        Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames());
+        Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames());
+        Assert.assertEquals(1, node.getPipeline().size());
+    }
+}