You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:46 UTC

[29/50] [abbrv] incubator-rya git commit: RYA-377 Implement the Aggregation Processor for Rya Streams.

RYA-377 Implement the Aggregation Processor for Rya Streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/95df37a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/95df37a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/95df37a3

Branch: refs/heads/master
Commit: 95df37a30ff6e4f2e70a863872e1087a447de47f
Parents: 8363724
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 21 16:53:38 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationStateStore.java      |  49 ++
 .../aggregation/AggregationsEvaluator.java      | 175 +++++++
 .../AggregationProcessorSupplier.java           | 158 +++++++
 .../KeyValueAggregationStateStore.java          | 104 +++++
 .../streams/kafka/topology/TopologyFactory.java |  24 +-
 .../aggregation/AggregationProcessorIT.java     | 457 +++++++++++++++++++
 6 files changed, 962 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java
new file mode 100644
index 0000000..d37f4c7
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import java.util.Optional;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a mechanism for storing the updating {@link AggregationState} while using an {@link AggregationsEvaluator}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface AggregationStateStore {
+
+    /**
+     * Stores a state. If this value updates a previously stored state, then it will overwrite the old value
+     * with the new one.
+     *
+     * @param state - The state that will be stored. (not null)
+     */
+    public void store(AggregationState state);
+
+    /**
+     * Get the {@link AggregationState} that may be updatted using the provided binding set.
+     *
+     * @param bs - A binding set that defines which state to fetch. (not null)
+     * @return The {@link AggregationState} that is updated by the binding set, if one has been stored.
+     */
+    public Optional<AggregationState> get(VisibilityBindingSet bs);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java
new file mode 100644
index 0000000..2aa716f
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.visibility.VisibilitySimplifier;
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.GroupElem;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A stateful evaluator that processes aggregation functions over variables that are grouped together.
+ * </p>
+ * The following aggregation functions are supported:
+ * <ul>
+ *   <li>Count</li>
+ *   <li>Sum</li>
+ *   <li>Average</li>
+ *   <li>Min</li>
+ *   <li>Max</li>
+ * </ul>
+ * </p>
+ * The persistence of the aggregation's state is determined by the provided {@link AggregationStateStore}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationsEvaluator {
+
+    private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS;
+    static {
+        final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder();
+        builder.put(AggregationType.COUNT, new CountFunction());
+        builder.put(AggregationType.SUM, new SumFunction());
+        builder.put(AggregationType.AVERAGE, new AverageFunction());
+        builder.put(AggregationType.MIN, new MinFunction());
+        builder.put(AggregationType.MAX, new MaxFunction());
+        FUNCTIONS = builder.build();
+    }
+
+    private final AggregationStateStore aggStateStore;
+    private final Collection<AggregationElement> aggregations;
+    private final List<String> groupByVars;
+
+    /**
+     * Constructs an instance of {@link AggregationsEvaluator}.
+     *
+     * @param aggStateStore - The mechanism for storing aggregation state. (not null)
+     * @param aggregations - The aggregation functions that will be computed. (not null)
+     * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null)
+     */
+    public AggregationsEvaluator(
+            final AggregationStateStore aggStateStore,
+            final Collection<AggregationElement> aggregations,
+            final List<String> groupByVars) {
+        this.aggStateStore = requireNonNull(aggStateStore);
+        this.aggregations = requireNonNull(aggregations);
+        this.groupByVars = requireNonNull(groupByVars);
+    }
+
+    /**
+     * Make an instance of {@link AggregationsEvaluator} based on a {@link Group} node.
+     *
+     * @param aggStateStore - The mechanism for storing aggregation state. (not null)
+     * @param aggNode - Defines which aggregation functions need to be performed.
+     * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null)
+     * @return The evaluator that handles the node's aggregations.
+     */
+    public static  AggregationsEvaluator make(final AggregationStateStore aggStateStore, final Group aggNode, final List<String> groupByVars) {
+        requireNonNull(aggStateStore);
+        requireNonNull(aggNode);
+        requireNonNull(groupByVars);
+
+        // The aggregations that need to be performed are the Group Elements.
+        final List<AggregationElement> aggregations = new ArrayList<>();
+        for(final GroupElem groupElem : aggNode.getGroupElements()) {
+            // Figure out the type of the aggregation.
+            final AggregateOperator operator = groupElem.getOperator();
+            final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() );
+
+            // If the type is one we support, create the AggregationElement.
+            if(type.isPresent()) {
+                final String resultBindingName = groupElem.getName();
+
+                final AtomicReference<String> aggregatedBindingName = new AtomicReference<>();
+                groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() {
+                    @Override
+                    public void meet(final Var node) {
+                        aggregatedBindingName.set( node.getName() );
+                    }
+                });
+
+                aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) );
+            }
+        }
+
+        return new AggregationsEvaluator(aggStateStore, aggregations, groupByVars);
+    }
+
+    /**
+     * Update the aggregation values using information found within {@code newBs}.
+     *
+     * @param newBs - A binding set whose values need to be incorporated within the aggregations. (not null)
+     * @return A binding set containing the updated aggregation values.
+     */
+    public VisibilityBindingSet update(final VisibilityBindingSet newBs) {
+        requireNonNull(newBs);
+
+        // Load the old state if one was previously stored; otherwise initialize the state.
+        final AggregationState state = aggStateStore.get(newBs).orElseGet(() -> {
+            // Initialize a new state.
+            final AggregationState newState = new AggregationState();
+
+            // If we have group by bindings, their values need to be added to the state's binding set.
+            final MapBindingSet bindingSet = newState.getBindingSet();
+            for(final String groupByVar : groupByVars) {
+                bindingSet.addBinding( newBs.getBinding(groupByVar) );
+            }
+
+            return newState;
+        });
+
+        // Update the visibilities of the result binding set based on the new result's visibilities.
+        final String oldVisibility = state.getVisibility();
+        final String updateVisibilities = VisibilitySimplifier.unionAndSimplify(oldVisibility, newBs.getVisibility());
+        state.setVisibility(updateVisibilities);
+
+        // Update the Aggregation State with each Aggregation function included within this group.
+        for(final AggregationElement aggregation : aggregations) {
+            final AggregationType type = aggregation.getAggregationType();
+            final AggregationFunction function = FUNCTIONS.get(type);
+            if(function == null) {
+                throw new RuntimeException("Unrecognized aggregation function: " + type);
+            }
+
+            function.update(aggregation, state, newBs);
+        }
+
+        // Store the updated state. This will write on top of any old state that was present for the Group By values.
+        aggStateStore.store(state);
+
+        // Return the updated binding set from the updated state.
+        return new VisibilityBindingSet(state.getBindingSet(), state.getVisibility());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java
new file mode 100644
index 0000000..c8e1049
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationStateStore;
+import org.apache.rya.api.function.aggregation.AggregationsEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.openrdf.query.algebra.Group;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.internal.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link AggregationProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AggregationProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+    private final String stateStoreName;
+    private final Group aggNode;
+
+    /**
+     * Constructs an instance of {@link AggregationProcessorSupplier}.
+     *
+     * @param stateStoreName - The name of the state store the processor will use. (not null)
+     * @param aggNode - Defines which aggregations will be performed by the processor. (not null)
+     * @param resultFactory - The factory that the supplied processors will use to create results. (not null)
+     */
+    public AggregationProcessorSupplier(
+            final String stateStoreName,
+            final Group aggNode,
+            final ProcessorResultFactory resultFactory) {
+        super(resultFactory);
+        this.stateStoreName = requireNonNull(stateStoreName);
+        this.aggNode = requireNonNull(aggNode);
+    }
+
+    @Override
+    public Processor<Object, ProcessorResult> get() {
+        return new AggregationProcessor(stateStoreName, aggNode, super.getResultFactory());
+    }
+
+    /**
+     * Evaluates a {@link Group} node that contains a bunch of aggregations. Each aggregation will have a binding
+     * within the resulting binding sets that contains the aggregation value.
+     *
+     * @see AggregationsEvaluator
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static class AggregationProcessor extends RyaStreamsProcessor {
+        private static final Logger log = LoggerFactory.getLogger(AggregationProcessor.class);
+
+        private final String stateStoreName;
+        private final Group aggNode;
+
+        private ProcessorContext context;
+        private AggregationStateStore aggStateStore;
+        private AggregationsEvaluator evaluator;
+
+        /**
+         * Constructs an instance of {@link AggregationProcessor}.
+         *
+         * @param stateStoreName - The name of the Kafka Streams state store that this processor will use. (not null)
+         * @param aggNode - The group by node that configures how the aggregations will be performed. (not null)
+         * @param resultFactory - The factory that will format this processor's final results for the downstream
+         *   processor. (not null)
+         */
+        public AggregationProcessor(
+                final String stateStoreName,
+                final Group aggNode,
+                final ProcessorResultFactory resultFactory) {
+            super(resultFactory);
+            this.stateStoreName = requireNonNull(stateStoreName);
+            this.aggNode = requireNonNull(aggNode);
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+
+            // Sort the group by vars so that they will be written to the state store in the same order every time.
+            final List<String> groupByVars = Lists.newArrayList(aggNode.getGroupBindingNames());
+            groupByVars.sort(Comparator.naturalOrder());
+
+            // Get a reference to the state store that keeps track of aggregation state.
+            final KeyValueStore<String, AggregationState> stateStore =
+                    (KeyValueStore<String, AggregationState>) context.getStateStore( stateStoreName );
+            aggStateStore = new KeyValueAggregationStateStore(stateStore, groupByVars);
+
+            // Create the aggregation evaluator.
+            evaluator = AggregationsEvaluator.make(aggStateStore, aggNode, groupByVars);
+        }
+
+        @Override
+        public void process(final Object key, final ProcessorResult value) {
+            // Aggregations can only be unary.
+            if (value.getType() != ResultType.UNARY) {
+                throw new RuntimeException("The ProcessorResult to be processed must be Unary.");
+            }
+
+            // Log the binding set that has been input.
+            log.debug("\nINPUT:\nBinding Set: {}", value.getUnary().getResult());
+
+            // Update the aggregations values.
+            final VisibilityBindingSet resultBs = evaluator.update(value.getUnary().getResult());
+
+            // Log the binding set that will be output.
+            log.debug("\nOUTPUT:\nBinding Set: {}", resultBs);
+
+            // Forward to the updated aggregation binding set to the downstream processors.
+            context.forward(key, super.getResultFactory().make(resultBs));
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Do nothing.
+        }
+
+        @Override
+        public void close() {
+            // Do nothing.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java
new file mode 100644
index 0000000..3300590
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationStateStore;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Joiner;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link KeyValueStore} implementation of {@link AggregationStateStore}.
+ * </p>
+ * This is a key/value store, so we need to store the {@link AggregationState} for each set of group by values
+ * using a key that is composed with those values. We use the following pattern to accomplish this:
+ * <pre>
+ * [groupByVar1 value],[groupByVar2 value],...,[groupByVarN value]
+ * </pre>
+ */
+@DefaultAnnotation(NonNull.class)
+public class KeyValueAggregationStateStore implements AggregationStateStore {
+
+    private final KeyValueStore<String, AggregationState> store;
+    private final List<String> groupByVars;
+
+    /**
+     * Constructs an instance of {@link KeyValueAggregationStateStore}.
+     *
+     * @param store - The state store that will be used. (not null)
+     * @param groupByVars - An ordered list of group by variable names. (not null)
+     */
+    public KeyValueAggregationStateStore(
+            final KeyValueStore<String, AggregationState> store,
+            final List<String> groupByVars) {
+        this.store = requireNonNull(store);
+        this.groupByVars = requireNonNull(groupByVars);
+    }
+
+    @Override
+    public void store(final AggregationState state) {
+        requireNonNull(state);
+
+        // Aggregations group their states by their group by variables, so the key is the resulting binding
+        // set's values for the group by variables.
+        final String key = makeCommaDelimitedValues(groupByVars, state.getBindingSet());
+        store.put(key, state);
+    }
+
+    @Override
+    public Optional<AggregationState> get(final VisibilityBindingSet bs) {
+        requireNonNull(bs);
+
+        final String key = makeCommaDelimitedValues(groupByVars, bs);
+        return Optional.ofNullable(store.get(key));
+    }
+
+    /**
+     * A utility function that helps construct the keys used by {@link KeyValueAggregationStateStore}.
+     *
+     * @param vars - Which variables within the binding set to use for the key's values. (not null)
+     * @param bindingSet - The binding set the key is being constructed from. (not null)
+     * @return A comma delimited list of the binding values, leading with the side.
+     */
+    private static String makeCommaDelimitedValues(final List<String> vars, final BindingSet bindingSet) {
+        requireNonNull(vars);
+        requireNonNull(bindingSet);
+
+        // Make a an ordered list of the binding set variables.
+        final List<String> values = new ArrayList<>();
+        for(final String var : vars) {
+            values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" );
+        }
+
+        // Return a comma delimited list of those values.
+        return Joiner.on(",").join(values);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
index 426b041..4046e23 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -51,6 +51,7 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
@@ -65,6 +66,7 @@ import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.BinaryTupleOperator;
 import org.openrdf.query.algebra.Extension;
 import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Group;
 import org.openrdf.query.algebra.Join;
 import org.openrdf.query.algebra.LeftJoin;
 import org.openrdf.query.algebra.MultiProjection;
@@ -94,6 +96,7 @@ public class TopologyFactory implements TopologyBuilderFactory {
     private static final String JOIN_PREFIX = "JOIN_";
     private static final String PROJECTION_PREFIX = "PROJECTION_";
     private static final String FILTER_PREFIX = "FILTER_";
+    private static final String AGGREGATION_PREFIX = "AGGREGATION_";
     private static final String SINK = "SINK";
 
     private List<ProcessorEntry> processorEntryList;
@@ -141,14 +144,15 @@ public class TopologyFactory implements TopologyBuilderFactory {
                 builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs);
             }
 
-            if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) {
+            // Add a state store for any node type that requires one.
+            if (entry.getNode() instanceof Join ||  entry.getNode() instanceof LeftJoin || entry.getNode() instanceof Group) {
                 // Add a state store for the join processor.
                 final StateStoreSupplier joinStoreSupplier =
                         Stores.create( entry.getID() )
-                        .withStringKeys()
-                        .withValues(new VisibilityBindingSetSerde())
-                        .persistent()
-                        .build();
+                            .withStringKeys()
+                            .withValues(new VisibilityBindingSetSerde())
+                            .persistent()
+                            .build();
                 builder.addStateStore(joinStoreSupplier, entry.getID());
             }
         }
@@ -459,6 +463,16 @@ public class TopologyFactory implements TopologyBuilderFactory {
             super.meet(node);
         }
 
+        @Override
+        public void meet(final Group node) throws TopologyBuilderException {
+            final String id = AGGREGATION_PREFIX + UUID.randomUUID();
+            final Optional<Side> side = getSide(node);
+            final AggregationProcessorSupplier supplier = new AggregationProcessorSupplier(id, node, (result) -> getResult(side, result));
+            entries.add( new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getArg())) );
+            idMap.put(node, id);
+            super.meet(node);
+        }
+
         /**
          * Gets the {@link Side} the current node in the visitor is on relative to the provided node.
          * @param node - The node used to determine the side of the current visitor node.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
new file mode 100644
index 0000000..ccf5c0c
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests {@link AggregationProcessor}.
+ */
+public class AggregationProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+    @Test
+    public void count() throws Exception {
+        // A query that figures out how many books each person has.
+        final String sparql =
+                "SELECT ?person (count(?book) as ?bookCount) " +
+                "WHERE { " +
+                    "?person <urn:hasBook> ?book " +
+                "} GROUP BY ?person";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, "a&b"));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void sum() throws Exception {
+        // A query that figures out how much food each person has.
+        final String sparql =
+                "SELECT ?person (sum(?foodCount) as ?totalFood) " +
+                "WHERE { " +
+                    "?person <urn:hasFoodType> ?food . " +
+                    "?food <urn:count> ?foodCount . " +
+                "} GROUP BY ?person";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void average() throws Exception {
+        // A query that figures out the average age across all people.
+        final String sparql =
+                "SELECT (avg(?age) as ?avgAge) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void min() throws Exception {
+        // A query that figures out what the youngest age is across all people.
+        final String sparql =
+                "SELECT (min(?age) as ?youngest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(7));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void max() throws Exception {
+        // A query that figures out what the oldest age is across all people.
+        final String sparql =
+                "SELECT (max(?age) as ?oldest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(25));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multipleGroupByVars() throws Exception {
+        // A query that contains more than one group by variable.
+        final String sparql =
+                "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
+                "WHERE {" +
+                    "?employee <urn:worksAt> ?business . " +
+                    "?business <urn:hasTimecardId> ?timecardId . " +
+                    "?employee <urn:hasTimecardId> ?timecardId . " +
+                    "?timecardId <urn:hours> ?hours . " +
+                "} GROUP BY ?business ?employee";
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 4000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multipleAggregations() throws Exception {
+        // A query that figures out what the youngest and oldest ages are across all people.
+        final String sparql =
+                "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        bs.addBinding("oldest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(7));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        bs.addBinding("oldest", vf.createLiteral(25));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file