You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:59 UTC

[42/50] [abbrv] incubator-rya git commit: RYA-377 Repackaged the common Aggregation code into the rya.api.functions project.

RYA-377 Repackaged the common Aggregation code into the rya.api.functions project.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8363724b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8363724b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8363724b

Branch: refs/heads/master
Commit: 8363724b4d684c47fe806a4f364e40134964e6cb
Parents: 1535b46
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 21 16:29:30 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationElement.java         | 105 +++++
 .../aggregation/AggregationFunction.java        |  41 ++
 .../function/aggregation/AggregationState.java  | 120 ++++++
 .../function/aggregation/AggregationType.java   |  66 +++
 .../function/aggregation/AverageFunction.java   |  96 +++++
 .../api/function/aggregation/AverageState.java  |  93 +++++
 .../api/function/aggregation/CountFunction.java |  61 +++
 .../api/function/aggregation/MaxFunction.java   |  63 +++
 .../api/function/aggregation/MinFunction.java   |  63 +++
 .../api/function/aggregation/SumFunction.java   |  85 ++++
 .../pcj/fluo/app/AggregationResultUpdater.java  | 410 +------------------
 .../fluo/app/observers/AggregationObserver.java |   4 +-
 .../pcj/fluo/app/query/AggregationMetadata.java | 130 +-----
 .../pcj/fluo/app/query/FluoQueryColumns.java    |   2 +-
 .../fluo/app/query/FluoQueryMetadataDAO.java    |  11 +-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 255 ++++++------
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |  66 +--
 17 files changed, 994 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
new file mode 100644
index 0000000..3112059
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query.
+ * </p>
+ * For example, if you have the following in SPARQL:
+ * <pre>
+ * SELECT (avg(?price) as ?avgPrice) {
+ *     ...
+ * }
+ * </pre>
+ * You would construct an instance of this object like so:
+ * <pre>
+ * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice");
+ * </pre>
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AggregationElement implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final AggregationType aggregationType;
+    private final String aggregatedBindingName;
+    private final String resultBindingName;
+
+    /**
+     * Constructs an instance of {@link AggregationElement}.
+     *
+     * @param aggregationType - Defines how the binding values will be aggregated. (not null)
+     * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must
+     *   appear within the child node's emitted binding sets. (not null)
+     * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding
+     *   must appeared within the AggregationMetadata's variable order. (not null)
+     */
+    public AggregationElement(
+            final AggregationType aggregationType,
+            final String aggregatedBindingName,
+            final String resultBindingName) {
+        this.aggregationType = requireNonNull(aggregationType);
+        this.aggregatedBindingName = requireNonNull(aggregatedBindingName);
+        this.resultBindingName = requireNonNull(resultBindingName);
+    }
+
+    /**
+     * @return Defines how the binding values will be aggregated.
+     */
+    public AggregationType getAggregationType() {
+        return aggregationType;
+    }
+
+    /**
+     * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets.
+     */
+    public String getAggregatedBindingName() {
+        return aggregatedBindingName;
+    }
+
+    /**
+     * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order.
+     */
+    public String getResultBindingName() {
+        return resultBindingName;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName);
+    }
+
+    @Override
+    public boolean equals(final Object o ) {
+        if(o instanceof AggregationElement) {
+            final AggregationElement agg = (AggregationElement) o;
+            return Objects.equals(aggregationType, agg.aggregationType) &&
+                    Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) &&
+                    Objects.equals(resultBindingName, agg.resultBindingName);
+        }
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..e8c49e7
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A function that updates an {@link AggregationState}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface AggregationFunction {
+
+    /**
+     * Updates an {@link AggregationState} based on the values of a child Binding Set.
+     *
+     * @param aggregation - Defines which function needs to be performed as well as any details required
+     *   to do the aggregation work. (not null)
+     * @param state - The state that will be updated. (not null)
+     * @param childBindingSet - The Binding Set whose values will be used to update the state.
+     */
+    public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
new file mode 100644
index 0000000..2551696
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Keeps track information required to update and build the resulting Binding Set for a set of Group By values.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AggregationState implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // The visibility equation that encompasses all data the aggregation state is derived from.
+    private String visibility;
+
+    // A binding set that holds the current state of the aggregations.
+    private final MapBindingSet bindingSet;
+
+    // A map from result binding name to the state that derived that binding's value.
+    private final Map<String, AverageState> avgStates;
+
+    /**
+     * Constructs an instance of {@link AggregationState}.
+     */
+    public AggregationState() {
+        this.visibility = "";
+        this.bindingSet = new MapBindingSet();
+        this.avgStates = new HashMap<>();
+    }
+
+    /**
+     * Constructs an instance of {@link AggregationState}.
+     *
+     * @param visibility - The visibility equation associated with the resulting binding set. (not null)
+     * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of
+     *   Group By values. (not null)
+     * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to
+     *   average state for that binding.
+     */
+    public AggregationState(
+            final String visibility,
+            final MapBindingSet bindingSet,
+            final Map<String, AverageState> avgStates) {
+        this.visibility = requireNonNull(visibility);
+        this.bindingSet = requireNonNull(bindingSet);
+        this.avgStates = requireNonNull(avgStates);
+    }
+
+    /**
+     * @return The visibility equation associated with the resulting binding set.
+     */
+    public String getVisibility() {
+        return visibility;
+    }
+
+    /**
+     * @param visibility - The visibility equation associated with the resulting binding set.
+     */
+    public void setVisibility(final String visibility) {
+        this.visibility = requireNonNull(visibility);
+    }
+
+    /**
+     * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values.
+     */
+    public MapBindingSet getBindingSet() {
+        return bindingSet;
+    }
+
+    /**
+     * @return If the aggregation is doing an Average, this is a map from result binding name to
+     *   average state for that binding.
+     */
+    public Map<String, AverageState> getAverageStates() {
+        return avgStates;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(visibility, bindingSet, avgStates);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(o instanceof AggregationState) {
+            final AggregationState state = (AggregationState) o;
+            return Objects.equals(visibility, state.visibility) &&
+                    Objects.equals(bindingSet, state.bindingSet) &&
+                    Objects.equals(avgStates, state.avgStates);
+        }
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
new file mode 100644
index 0000000..5383da1
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Avg;
+import org.openrdf.query.algebra.Count;
+import org.openrdf.query.algebra.Max;
+import org.openrdf.query.algebra.Min;
+import org.openrdf.query.algebra.Sum;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * The different types of Aggregation functions that an aggregate node may perform.
+ */
+@DefaultAnnotation(NonNull.class)
+public enum AggregationType {
+    MIN(Min.class),
+    MAX(Max.class),
+    COUNT(Count.class),
+    SUM(Sum.class),
+    AVERAGE(Avg.class);
+
+    private final Class<? extends AggregateOperator> operatorClass;
+
+    private AggregationType(final Class<? extends AggregateOperator> operatorClass) {
+        this.operatorClass = requireNonNull(operatorClass);
+    }
+
+    private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass;
+    static {
+        final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder();
+        for(final AggregationType type : AggregationType.values()) {
+            builder.put(type.operatorClass, type);
+        }
+        byOperatorClass = builder.build();
+    }
+
+    public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) {
+        return Optional.ofNullable( byOperatorClass.get(operatorClass) );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
new file mode 100644
index 0000000..a73d5ac
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.DecimalLiteralImpl;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name
+ * that is being averaged by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AverageFunction implements AggregationFunction {
+    private static final Logger log = LoggerFactory.getLogger(AverageFunction.class);
+
+    @Override
+    public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+        checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
+
+        // Only update the average if the child contains the binding that we are averaging.
+        final String aggregatedName = aggregation.getAggregatedBindingName();
+        if(childBindingSet.hasBinding(aggregatedName)) {
+            final MapBindingSet result = state.getBindingSet();
+            final String resultName = aggregation.getResultBindingName();
+            final boolean newBinding = !result.hasBinding(resultName);
+
+            // Get the state of the average.
+            final Map<String, AverageState> averageStates = state.getAverageStates();
+            AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName);
+
+            // Update the state of the average.
+            final Value childValue = childBindingSet.getValue(aggregatedName);
+            if(childValue instanceof Literal) {
+                final Literal childLiteral = (Literal) childValue;
+                if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+                    try {
+                        // Update the sum.
+                        final Literal oldSum = new DecimalLiteralImpl(averageState.getSum());
+                        final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue();
+
+                        // Update the count.
+                        final BigInteger count = averageState.getCount().add( BigInteger.ONE );
+
+                        // Update the BindingSet to include the new average.
+                        final Literal sumLiteral = new DecimalLiteralImpl(sum);
+                        final Literal countLiteral = new IntegerLiteralImpl(count);
+                        final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE);
+                        result.addBinding(resultName, average);
+
+                        // Update the average state that is stored.
+                        averageState = new AverageState(sum, count);
+                        averageStates.put(resultName, averageState);
+                    } catch (final ValueExprEvaluationException e) {
+                        log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet);
+                        return;
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
new file mode 100644
index 0000000..8917751
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * The Sum and Count of the values that are being averaged. The average itself is derived from these values.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AverageState implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final BigDecimal sum;
+    private final BigInteger count;
+
+    /**
+     * Constructs an instance of {@link AverageState} where the count and sum start at 0.
+     */
+    public AverageState() {
+        sum = BigDecimal.ZERO;
+        count = BigInteger.ZERO;
+    }
+
+    /**
+     * Constructs an instance of {@link AverageState}.
+     *
+     * @param sum - The sum of the values that are averaged. (not null)
+     * @param count - The number of values that are averaged. (not null)
+     */
+    public AverageState(final BigDecimal sum, final BigInteger count) {
+        this.sum = requireNonNull(sum);
+        this.count = requireNonNull(count);
+    }
+
+    /**
+     * @return The sum of the values that are averaged.
+     */
+    public BigDecimal getSum() {
+        return sum;
+    }
+
+    /**
+     * @return The number of values that are averaged.
+     */
+    public BigInteger getCount() {
+        return count;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sum, count);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(o instanceof AverageState) {
+            final AverageState state = (AverageState) o;
+            return Objects.equals(sum, state.sum) &&
+                    Objects.equals(count, state.count);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "Sum: " + sum + " Count: " + count;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
new file mode 100644
index 0000000..7dd5b21
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name
+ * that is being counted by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class CountFunction implements AggregationFunction {
+    @Override
+    public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+        checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
+
+        // Only add one to the count if the child contains the binding that we are counting.
+        final String aggregatedName = aggregation.getAggregatedBindingName();
+        if(childBindingSet.hasBinding(aggregatedName)) {
+            final MapBindingSet result = state.getBindingSet();
+            final String resultName = aggregation.getResultBindingName();
+            final boolean newBinding = !result.hasBinding(resultName);
+
+            if(newBinding) {
+                // Initialize the binding.
+                result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE));
+            } else {
+                // Update the existing binding.
+                final Literal count = (Literal) result.getValue(resultName);
+                final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE );
+                result.addBinding(resultName, new IntegerLiteralImpl(updatedCount));
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
new file mode 100644
index 0000000..3295fbb
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being
+ * maxed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MaxFunction implements AggregationFunction {
+
+    private final ValueComparator compare = new ValueComparator();
+
+    @Override
+    public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+        checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
+
+        // Only update the max if the child contains the binding that we are finding the max value for.
+        final String aggregatedName = aggregation.getAggregatedBindingName();
+        if(childBindingSet.hasBinding(aggregatedName)) {
+            final MapBindingSet result = state.getBindingSet();
+            final String resultName = aggregation.getResultBindingName();
+            final boolean newBinding = !result.hasBinding(resultName);
+
+            Value max;
+            if(newBinding) {
+                max = childBindingSet.getValue(aggregatedName);
+            } else {
+                final Value oldMax = result.getValue(resultName);
+                final Value childMax = childBindingSet.getValue(aggregatedName);
+                max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax;
+            }
+
+            result.addBinding(resultName, max);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
new file mode 100644
index 0000000..d6bf751
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being
+ * mined by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MinFunction implements AggregationFunction {
+
+    private final ValueComparator compare = new ValueComparator();
+
+    @Override
+    public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+        checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
+
+        // Only update the min if the child contains the binding that we are finding the min value for.
+        final String aggregatedName = aggregation.getAggregatedBindingName();
+        if(childBindingSet.hasBinding(aggregatedName)) {
+            final MapBindingSet result = state.getBindingSet();
+            final String resultName = aggregation.getResultBindingName();
+            final boolean newBinding = !result.hasBinding(resultName);
+
+            Value min;
+            if(newBinding) {
+                min = childBindingSet.getValue(aggregatedName);
+            } else {
+                final Value oldMin = result.getValue(resultName);
+                final Value chidlMin = childBindingSet.getValue(aggregatedName);
+                min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin;
+            }
+
+            result.addBinding(resultName, min);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
new file mode 100644
index 0000000..97735f2
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name
+ * that is being summed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class SumFunction implements AggregationFunction {
+    private static final Logger log = LoggerFactory.getLogger(SumFunction.class);
+
+    @Override
+    public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+        checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
+
+        // Only add values to the sum if the child contains the binding that we are summing.
+        final String aggregatedName = aggregation.getAggregatedBindingName();
+        if(childBindingSet.hasBinding(aggregatedName)) {
+            final MapBindingSet result = state.getBindingSet();
+            final String resultName = aggregation.getResultBindingName();
+            final boolean newBinding = !result.hasBinding(resultName);
+
+            // Get the starting number for the sum.
+            Literal sum;
+            if(newBinding) {
+                sum = new IntegerLiteralImpl(BigInteger.ZERO);
+            } else {
+                sum = (Literal) state.getBindingSet().getValue(resultName);
+            }
+
+            // Add the child binding set's value if it is a numeric literal.
+            final Value childValue = childBindingSet.getValue(aggregatedName);
+            if(childValue instanceof Literal) {
+                final Literal childLiteral = (Literal) childValue;
+                if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+                    try {
+                        sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS);
+                    } catch (final ValueExprEvaluationException e) {
+                        log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet);
+                        return;
+                    }
+                }
+            }
+
+            // Update the state to include the new sum.
+            result.addBinding(resultName, sum);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
index bb96a6a..4fbaad9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
@@ -18,19 +18,12 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 
 import org.apache.commons.io.serialization.ValidatingObjectInputStream;
@@ -38,22 +31,21 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.function.aggregation.AggregationElement;
+import org.apache.rya.api.function.aggregation.AggregationFunction;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationType;
+import org.apache.rya.api.function.aggregation.AverageFunction;
+import org.apache.rya.api.function.aggregation.AverageState;
+import org.apache.rya.api.function.aggregation.CountFunction;
+import org.apache.rya.api.function.aggregation.MaxFunction;
+import org.apache.rya.api.function.aggregation.MinFunction;
+import org.apache.rya.api.function.aggregation.SumFunction;
 import org.apache.rya.api.log.LogUtils;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Value;
-import org.openrdf.model.datatypes.XMLDatatypeUtil;
-import org.openrdf.model.impl.DecimalLiteralImpl;
-import org.openrdf.model.impl.IntegerLiteralImpl;
-import org.openrdf.query.algebra.MathExpr.MathOp;
-import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
-import org.openrdf.query.algebra.evaluation.util.MathUtil;
-import org.openrdf.query.algebra.evaluation.util.ValueComparator;
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.ImmutableMap;
@@ -154,213 +146,6 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
     }
 
     /**
-     * A function that updates an {@link AggregationState}.
-     */
-    public static interface AggregationFunction {
-
-        /**
-         * Updates an {@link AggregationState} based on the values of a child Binding Set.
-         *
-         * @param aggregation - Defines which function needs to be performed as well as any details required
-         *   to do the aggregation work. (not null)
-         * @param state - The state that will be updated. (not null)
-         * @param childBindingSet - The Binding Set whose values will be used to update the state.
-         */
-        public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet);
-    }
-
-    /**
-     * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name
-     * that is being counted by the {@link AggregationElement}.
-     */
-    public static final class CountFunction implements AggregationFunction {
-        @Override
-        public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
-            checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
-
-            // Only add one to the count if the child contains the binding that we are counting.
-            final String aggregatedName = aggregation.getAggregatedBindingName();
-            if(childBindingSet.hasBinding(aggregatedName)) {
-                final MapBindingSet result = state.getBindingSet();
-                final String resultName = aggregation.getResultBindingName();
-                final boolean newBinding = !result.hasBinding(resultName);
-
-                if(newBinding) {
-                    // Initialize the binding.
-                    result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE));
-                } else {
-                    // Update the existing binding.
-                    final Literal count = (Literal) result.getValue(resultName);
-                    final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE );
-                    result.addBinding(resultName, new IntegerLiteralImpl(updatedCount));
-                }
-            }
-        }
-    }
-
-    /**
-     * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name
-     * that is being summed by the {@link AggregationElement}.
-     */
-    public static final class SumFunction implements AggregationFunction {
-        @Override
-        public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
-            checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
-
-            // Only add values to the sum if the child contains the binding that we are summing.
-            final String aggregatedName = aggregation.getAggregatedBindingName();
-            if(childBindingSet.hasBinding(aggregatedName)) {
-                final MapBindingSet result = state.getBindingSet();
-                final String resultName = aggregation.getResultBindingName();
-                final boolean newBinding = !result.hasBinding(resultName);
-
-                // Get the starting number for the sum.
-                Literal sum;
-                if(newBinding) {
-                    sum = new IntegerLiteralImpl(BigInteger.ZERO);
-                } else {
-                    sum = (Literal) state.getBindingSet().getValue(resultName);
-                }
-
-                // Add the child binding set's value if it is a numeric literal.
-                final Value childValue = childBindingSet.getValue(aggregatedName);
-                if(childValue instanceof Literal) {
-                    final Literal childLiteral = (Literal) childValue;
-                    if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
-                        try {
-                            sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS);
-                        } catch (final ValueExprEvaluationException e) {
-                            log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet);
-                            return;
-                        }
-                    }
-                }
-
-                // Update the state to include the new sum.
-                result.addBinding(resultName, sum);
-            }
-        }
-    }
-
-    /**
-     * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name
-     * that is being averaged by the {@link AggregationElement}.
-     */
-    public static final class AverageFunction implements AggregationFunction {
-        @Override
-        public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
-            checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
-
-            // Only update the average if the child contains the binding that we are averaging.
-            final String aggregatedName = aggregation.getAggregatedBindingName();
-            if(childBindingSet.hasBinding(aggregatedName)) {
-                final MapBindingSet result = state.getBindingSet();
-                final String resultName = aggregation.getResultBindingName();
-                final boolean newBinding = !result.hasBinding(resultName);
-
-                // Get the state of the average.
-                final Map<String, AverageState> averageStates = state.getAverageStates();
-                AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName);
-
-                // Update the state of the average.
-                final Value childValue = childBindingSet.getValue(aggregatedName);
-                if(childValue instanceof Literal) {
-                    final Literal childLiteral = (Literal) childValue;
-                    if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
-                        try {
-                            // Update the sum.
-                            final Literal oldSum = new DecimalLiteralImpl(averageState.getSum());
-                            final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue();
-
-                            // Update the count.
-                            final BigInteger count = averageState.getCount().add( BigInteger.ONE );
-
-                            // Update the BindingSet to include the new average.
-                            final Literal sumLiteral = new DecimalLiteralImpl(sum);
-                            final Literal countLiteral = new IntegerLiteralImpl(count);
-                            final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE);
-                            result.addBinding(resultName, average);
-
-                            // Update the average state that is stored.
-                            averageState = new AverageState(sum, count);
-                            averageStates.put(resultName, averageState);
-                        } catch (final ValueExprEvaluationException e) {
-                            log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet);
-                            return;
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being
-     * maxed by the {@link AggregationElement}.
-     */
-    public static final class MaxFunction implements AggregationFunction {
-
-        private final ValueComparator compare = new ValueComparator();
-
-        @Override
-        public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
-            checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
-
-            // Only update the max if the child contains the binding that we are finding the max value for.
-            final String aggregatedName = aggregation.getAggregatedBindingName();
-            if(childBindingSet.hasBinding(aggregatedName)) {
-                final MapBindingSet result = state.getBindingSet();
-                final String resultName = aggregation.getResultBindingName();
-                final boolean newBinding = !result.hasBinding(resultName);
-
-                Value max;
-                if(newBinding) {
-                    max = childBindingSet.getValue(aggregatedName);
-                } else {
-                    final Value oldMax = result.getValue(resultName);
-                    final Value childMax = childBindingSet.getValue(aggregatedName);
-                    max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax;
-                }
-
-                result.addBinding(resultName, max);
-            }
-        }
-    }
-
-    /**
-     * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being
-     * mined by the {@link AggregationElement}.
-     */
-    public static final class MinFunction implements AggregationFunction {
-
-        private final ValueComparator compare = new ValueComparator();
-
-        @Override
-        public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
-            checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
-
-            // Only update the min if the child contains the binding that we are finding the min value for.
-            final String aggregatedName = aggregation.getAggregatedBindingName();
-            if(childBindingSet.hasBinding(aggregatedName)) {
-                final MapBindingSet result = state.getBindingSet();
-                final String resultName = aggregation.getResultBindingName();
-                final boolean newBinding = !result.hasBinding(resultName);
-
-                Value min;
-                if(newBinding) {
-                    min = childBindingSet.getValue(aggregatedName);
-                } else {
-                    final Value oldMin = result.getValue(resultName);
-                    final Value chidlMin = childBindingSet.getValue(aggregatedName);
-                    min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin;
-                }
-
-                result.addBinding(resultName, min);
-            }
-        }
-    }
-
-    /**
      * Reads/Writes instances of {@link AggregationState} to/from bytes.
      */
     public static interface AggregationStateSerDe {
@@ -410,18 +195,18 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
             // System.out.println("vois.accept(" + className + ".class, ");};};
                         ) {
                 // These classes are allowed to be deserialized. Others throw InvalidClassException.
-                vois.accept(org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState.class, //
-                                org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AverageState.class, //
-                                java.util.HashMap.class, //
-                                java.math.BigInteger.class, //
-                                java.lang.Number.class, //
-                                java.math.BigDecimal.class, //
-                                org.openrdf.query.impl.MapBindingSet.class, //
-                                java.util.LinkedHashMap.class, //
-                                org.openrdf.query.impl.BindingImpl.class, //
-                                org.openrdf.model.impl.URIImpl.class, //
-                                org.openrdf.model.impl.LiteralImpl.class, //
-                                org.openrdf.model.impl.DecimalLiteralImpl.class, //
+                vois.accept(AggregationState.class,
+                                AverageState.class,
+                                java.util.HashMap.class,
+                                java.math.BigInteger.class,
+                                java.lang.Number.class,
+                                java.math.BigDecimal.class,
+                                org.openrdf.query.impl.MapBindingSet.class,
+                                java.util.LinkedHashMap.class,
+                                org.openrdf.query.impl.BindingImpl.class,
+                                org.openrdf.model.impl.URIImpl.class,
+                                org.openrdf.model.impl.LiteralImpl.class,
+                                org.openrdf.model.impl.DecimalLiteralImpl.class,
                                 org.openrdf.model.impl.IntegerLiteralImpl.class);
                 vois.accept("[B"); // Array of Bytes
                 final Object o = vois.readObject();
@@ -437,155 +222,4 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
             return state;
         }
     }
-
-    /**
-     * Keeps track information required to update and build the resulting Binding Set for a set of Group By values.
-     */
-    public static final class AggregationState implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        // The visibility equation that encompasses all data the aggregation state is derived from.
-        private String visibility;
-
-        // A binding set that holds the current state of the aggregations.
-        private final MapBindingSet bindingSet;
-
-        // A map from result binding name to the state that derived that binding's value.
-        private final Map<String, AverageState> avgStates;
-
-        /**
-         * Constructs an instance of {@link AggregationState}.
-         */
-        public AggregationState() {
-            this.visibility = "";
-            this.bindingSet = new MapBindingSet();
-            this.avgStates = new HashMap<>();
-        }
-
-        /**
-         * Constructs an instance of {@link AggregationState}.
-         *
-         * @param visibility - The visibility equation associated with the resulting binding set. (not null)
-         * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of
-         *   Group By values. (not null)
-         * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to
-         *   average state for that binding.
-         */
-        public AggregationState(
-                final String visibility,
-                final MapBindingSet bindingSet,
-                final Map<String, AverageState> avgStates) {
-            this.visibility = requireNonNull(visibility);
-            this.bindingSet = requireNonNull(bindingSet);
-            this.avgStates = requireNonNull(avgStates);
-        }
-
-        /**
-         * @return The visibility equation associated with the resulting binding set.
-         */
-        public String getVisibility() {
-            return visibility;
-        }
-
-        /**
-         * @param visibility - The visibility equation associated with the resulting binding set.
-         */
-        public void setVisibility(final String visibility) {
-            this.visibility = requireNonNull(visibility);
-        }
-
-        /**
-         * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values.
-         */
-        public MapBindingSet getBindingSet() {
-            return bindingSet;
-        }
-
-        /**
-         * @return If the aggregation is doing an Average, this is a map from result binding name to
-         *   average state for that binding.
-         */
-        public Map<String, AverageState> getAverageStates() {
-            return avgStates;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(visibility, bindingSet, avgStates);
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if(o instanceof AggregationState) {
-                final AggregationState state = (AggregationState) o;
-                return Objects.equals(visibility, state.visibility) &&
-                        Objects.equals(bindingSet, state.bindingSet) &&
-                        Objects.equals(avgStates, state.avgStates);
-            }
-            return false;
-        }
-    }
-
-    /**
-     * The Sum and Count of the values that are being averaged. The average itself is derived from these values.
-     */
-    public static class AverageState implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private final BigDecimal sum;
-        private final BigInteger count;
-
-        /**
-         * Constructs an instance of {@link AverageState} where the count and sum start at 0.
-         */
-        public AverageState() {
-            sum = BigDecimal.ZERO;
-            count = BigInteger.ZERO;
-        }
-
-        /**
-         * Constructs an instance of {@link AverageState}.
-         *
-         * @param sum - The sum of the values that are averaged. (not null)
-         * @param count - The number of values that are averaged. (not null)
-         */
-        public AverageState(final BigDecimal sum, final BigInteger count) {
-            this.sum = requireNonNull(sum);
-            this.count = requireNonNull(count);
-        }
-
-        /**
-         * @return The sum of the values that are averaged.
-         */
-        public BigDecimal getSum() {
-            return sum;
-        }
-
-        /**
-         * @return The number of values that are averaged.
-         */
-        public BigInteger getCount() {
-            return count;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(sum, count);
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if(o instanceof AverageState) {
-                final AverageState state = (AverageState) o;
-                return Objects.equals(sum, state.sum) &&
-                        Objects.equals(count, state.count);
-            }
-            return false;
-        }
-
-        @Override
-        public String toString() {
-            return "Sum: " + sum + " Count: " + count;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 0271519..e806b15 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -23,10 +23,10 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AG
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
-import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationStateSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSerializationAggregationStateSerDe;
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index eaa072f..a839645 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -20,23 +20,14 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 
+import org.apache.rya.api.function.aggregation.AggregationElement;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.openrdf.query.algebra.AggregateOperator;
-import org.openrdf.query.algebra.Avg;
-import org.openrdf.query.algebra.Count;
-import org.openrdf.query.algebra.Max;
-import org.openrdf.query.algebra.Min;
-import org.openrdf.query.algebra.Sum;
-
-import com.google.common.collect.ImmutableMap;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -50,115 +41,6 @@ import net.jcip.annotations.Immutable;
 @DefaultAnnotation(NonNull.class)
 public class AggregationMetadata extends CommonNodeMetadata {
 
-    /**
-     * The different types of Aggregation functions that an aggregate node may perform.
-     */
-    public static enum AggregationType {
-        MIN(Min.class),
-        MAX(Max.class),
-        COUNT(Count.class),
-        SUM(Sum.class),
-        AVERAGE(Avg.class);
-
-        private final Class<? extends AggregateOperator> operatorClass;
-
-        private AggregationType(final Class<? extends AggregateOperator> operatorClass) {
-            this.operatorClass = requireNonNull(operatorClass);
-        }
-
-        private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass;
-        static {
-            final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder();
-            for(final AggregationType type : AggregationType.values()) {
-                builder.put(type.operatorClass, type);
-            }
-            byOperatorClass = builder.build();
-        }
-
-        public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) {
-            return Optional.ofNullable( byOperatorClass.get(operatorClass) );
-        }
-    }
-
-    /**
-     * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query.
-     * </p>
-     * For example, if you have the following in SPARQL:
-     * <pre>
-     * SELECT (avg(?price) as ?avgPrice) {
-     *     ...
-     * }
-     * </pre>
-     * You would construct an instance of this object like so:
-     * <pre>
-     * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice");
-     * </pre>
-     */
-    @Immutable
-    @DefaultAnnotation(NonNull.class)
-    public static final class AggregationElement implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private final AggregationType aggregationType;
-        private final String aggregatedBindingName;
-        private final String resultBindingName;
-
-        /**
-         * Constructs an instance of {@link AggregationElement}.
-         *
-         * @param aggregationType - Defines how the binding values will be aggregated. (not null)
-         * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must
-         *   appear within the child node's emitted binding sets. (not null)
-         * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding
-         *   must appeared within the AggregationMetadata's variable order. (not null)
-         */
-        public AggregationElement(
-                final AggregationType aggregationType,
-                final String aggregatedBindingName,
-                final String resultBindingName) {
-            this.aggregationType = requireNonNull(aggregationType);
-            this.aggregatedBindingName = requireNonNull(aggregatedBindingName);
-            this.resultBindingName = requireNonNull(resultBindingName);
-        }
-
-        /**
-         * @return Defines how the binding values will be aggregated.
-         */
-        public AggregationType getAggregationType() {
-            return aggregationType;
-        }
-
-        /**
-         * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets.
-         */
-        public String getAggregatedBindingName() {
-            return aggregatedBindingName;
-        }
-
-        /**
-         * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order.
-         */
-        public String getResultBindingName() {
-            return resultBindingName;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName);
-        }
-
-        @Override
-        public boolean equals(final Object o ) {
-            if(o instanceof AggregationElement) {
-                final AggregationElement agg = (AggregationElement) o;
-                return Objects.equals(aggregationType, agg.aggregationType) &&
-                        Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) &&
-                        Objects.equals(resultBindingName, agg.resultBindingName);
-            }
-            return false;
-        }
-    }
-
     private final String parentNodeId;
     private final String childNodeId;
     private final Collection<AggregationElement> aggregations;
@@ -308,6 +190,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
         /**
          * @return This node's Node ID.
          */
+        @Override
         public String getNodeId() {
             return nodeId;
         }
@@ -321,10 +204,11 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.varOrder = varOrder;
             return this;
         }
-        
+
         /**
          * @return the variable order of binding sets that are emitted by this node.
          */
+        @Override
         public VariableOrder getVariableOrder() {
             return varOrder;
         }
@@ -337,7 +221,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.parentNodeId = parentNodeId;
             return this;
         }
-       
+
         public String getParentNodeId() {
             return parentNodeId;
         }
@@ -350,7 +234,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.childNodeId = childNodeId;
             return this;
         }
-        
+
         public String getChildNodeId() {
             return childNodeId;
         }
@@ -375,7 +259,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.groupByVariables = groupByVariables;
             return this;
         }
-        
+
         /**
          * @return variable order that defines how data is grouped for the aggregation function
          */

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 9d1c4fc..e043671 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -25,8 +25,8 @@ import java.util.List;
 
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.function.aggregation.AggregationState;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index ba75a56..c193ef7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -37,11 +37,12 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.function.aggregation.AggregationElement;
+import org.apache.rya.api.function.aggregation.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
@@ -567,10 +568,10 @@ public class FluoQueryMetadataDAO {
         // System.out.println("vois.accept(" + className + ".class, ");};};
         ) {
             // These classes are allowed to be deserialized. Others throw InvalidClassException.
-            vois.accept(java.util.ArrayList.class, //
-                            java.lang.Enum.class, //
-                            org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement.class, //
-                            org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType.class);
+            vois.accept(java.util.ArrayList.class,
+                            java.lang.Enum.class,
+                            AggregationElement.class,
+                            AggregationType.class);
             final Object object = vois.readObject();
             if (!(object instanceof Collection<?>)) {
                 throw new InvalidClassException("Object read was not of type Collection. It was: " + object.getClass());