You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:59 UTC
[42/50] [abbrv] incubator-rya git commit: RYA-377 Repackaged the
common Aggregation code into the rya.api.functions project.
RYA-377 Repackaged the common Aggregation code into the rya.api.functions project.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8363724b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8363724b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8363724b
Branch: refs/heads/master
Commit: 8363724b4d684c47fe806a4f364e40134964e6cb
Parents: 1535b46
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 21 16:29:30 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500
----------------------------------------------------------------------
.../aggregation/AggregationElement.java | 105 +++++
.../aggregation/AggregationFunction.java | 41 ++
.../function/aggregation/AggregationState.java | 120 ++++++
.../function/aggregation/AggregationType.java | 66 +++
.../function/aggregation/AverageFunction.java | 96 +++++
.../api/function/aggregation/AverageState.java | 93 +++++
.../api/function/aggregation/CountFunction.java | 61 +++
.../api/function/aggregation/MaxFunction.java | 63 +++
.../api/function/aggregation/MinFunction.java | 63 +++
.../api/function/aggregation/SumFunction.java | 85 ++++
.../pcj/fluo/app/AggregationResultUpdater.java | 410 +------------------
.../fluo/app/observers/AggregationObserver.java | 4 +-
.../pcj/fluo/app/query/AggregationMetadata.java | 130 +-----
.../pcj/fluo/app/query/FluoQueryColumns.java | 2 +-
.../fluo/app/query/FluoQueryMetadataDAO.java | 11 +-
.../fluo/app/query/SparqlFluoQueryBuilder.java | 255 ++++++------
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 66 +--
17 files changed, 994 insertions(+), 677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
new file mode 100644
index 0000000..3112059
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query.
+ * </p>
+ * For example, if you have the following in SPARQL:
+ * <pre>
+ * SELECT (avg(?price) as ?avgPrice) {
+ * ...
+ * }
+ * </pre>
+ * You would construct an instance of this object like so:
+ * <pre>
+ * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice");
+ * </pre>
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AggregationElement implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final AggregationType aggregationType;
+ private final String aggregatedBindingName;
+ private final String resultBindingName;
+
+ /**
+ * Constructs an instance of {@link AggregationElement}.
+ *
+ * @param aggregationType - Defines how the binding values will be aggregated. (not null)
+ * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must
+ * appear within the child node's emitted binding sets. (not null)
+ * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding
+ * must appeared within the AggregationMetadata's variable order. (not null)
+ */
+ public AggregationElement(
+ final AggregationType aggregationType,
+ final String aggregatedBindingName,
+ final String resultBindingName) {
+ this.aggregationType = requireNonNull(aggregationType);
+ this.aggregatedBindingName = requireNonNull(aggregatedBindingName);
+ this.resultBindingName = requireNonNull(resultBindingName);
+ }
+
+ /**
+ * @return Defines how the binding values will be aggregated.
+ */
+ public AggregationType getAggregationType() {
+ return aggregationType;
+ }
+
+ /**
+ * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets.
+ */
+ public String getAggregatedBindingName() {
+ return aggregatedBindingName;
+ }
+
+ /**
+ * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order.
+ */
+ public String getResultBindingName() {
+ return resultBindingName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName);
+ }
+
+ @Override
+ public boolean equals(final Object o ) {
+ if(o instanceof AggregationElement) {
+ final AggregationElement agg = (AggregationElement) o;
+ return Objects.equals(aggregationType, agg.aggregationType) &&
+ Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) &&
+ Objects.equals(resultBindingName, agg.resultBindingName);
+ }
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..e8c49e7
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A function that updates an {@link AggregationState}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface AggregationFunction {
+
+ /**
+ * Updates an {@link AggregationState} based on the values of a child Binding Set.
+ *
+ * @param aggregation - Defines which function needs to be performed as well as any details required
+ * to do the aggregation work. (not null)
+ * @param state - The state that will be updated. (not null)
+ * @param childBindingSet - The Binding Set whose values will be used to update the state.
+ */
+ public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
new file mode 100644
index 0000000..2551696
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Keeps track information required to update and build the resulting Binding Set for a set of Group By values.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AggregationState implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // The visibility equation that encompasses all data the aggregation state is derived from.
+ private String visibility;
+
+ // A binding set that holds the current state of the aggregations.
+ private final MapBindingSet bindingSet;
+
+ // A map from result binding name to the state that derived that binding's value.
+ private final Map<String, AverageState> avgStates;
+
+ /**
+ * Constructs an instance of {@link AggregationState}.
+ */
+ public AggregationState() {
+ this.visibility = "";
+ this.bindingSet = new MapBindingSet();
+ this.avgStates = new HashMap<>();
+ }
+
+ /**
+ * Constructs an instance of {@link AggregationState}.
+ *
+ * @param visibility - The visibility equation associated with the resulting binding set. (not null)
+ * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of
+ * Group By values. (not null)
+ * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to
+ * average state for that binding.
+ */
+ public AggregationState(
+ final String visibility,
+ final MapBindingSet bindingSet,
+ final Map<String, AverageState> avgStates) {
+ this.visibility = requireNonNull(visibility);
+ this.bindingSet = requireNonNull(bindingSet);
+ this.avgStates = requireNonNull(avgStates);
+ }
+
+ /**
+ * @return The visibility equation associated with the resulting binding set.
+ */
+ public String getVisibility() {
+ return visibility;
+ }
+
+ /**
+ * @param visibility - The visibility equation associated with the resulting binding set.
+ */
+ public void setVisibility(final String visibility) {
+ this.visibility = requireNonNull(visibility);
+ }
+
+ /**
+ * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values.
+ */
+ public MapBindingSet getBindingSet() {
+ return bindingSet;
+ }
+
+ /**
+ * @return If the aggregation is doing an Average, this is a map from result binding name to
+ * average state for that binding.
+ */
+ public Map<String, AverageState> getAverageStates() {
+ return avgStates;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(visibility, bindingSet, avgStates);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(o instanceof AggregationState) {
+ final AggregationState state = (AggregationState) o;
+ return Objects.equals(visibility, state.visibility) &&
+ Objects.equals(bindingSet, state.bindingSet) &&
+ Objects.equals(avgStates, state.avgStates);
+ }
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
new file mode 100644
index 0000000..5383da1
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Avg;
+import org.openrdf.query.algebra.Count;
+import org.openrdf.query.algebra.Max;
+import org.openrdf.query.algebra.Min;
+import org.openrdf.query.algebra.Sum;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * The different types of Aggregation functions that an aggregate node may perform.
+ */
+@DefaultAnnotation(NonNull.class)
+public enum AggregationType {
+ MIN(Min.class),
+ MAX(Max.class),
+ COUNT(Count.class),
+ SUM(Sum.class),
+ AVERAGE(Avg.class);
+
+ private final Class<? extends AggregateOperator> operatorClass;
+
+ private AggregationType(final Class<? extends AggregateOperator> operatorClass) {
+ this.operatorClass = requireNonNull(operatorClass);
+ }
+
+ private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass;
+ static {
+ final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder();
+ for(final AggregationType type : AggregationType.values()) {
+ builder.put(type.operatorClass, type);
+ }
+ byOperatorClass = builder.build();
+ }
+
+ public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) {
+ return Optional.ofNullable( byOperatorClass.get(operatorClass) );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
new file mode 100644
index 0000000..a73d5ac
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.DecimalLiteralImpl;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name
+ * that is being averaged by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AverageFunction implements AggregationFunction {
+ private static final Logger log = LoggerFactory.getLogger(AverageFunction.class);
+
+ @Override
+ public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+ checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
+
+ // Only update the average if the child contains the binding that we are averaging.
+ final String aggregatedName = aggregation.getAggregatedBindingName();
+ if(childBindingSet.hasBinding(aggregatedName)) {
+ final MapBindingSet result = state.getBindingSet();
+ final String resultName = aggregation.getResultBindingName();
+ final boolean newBinding = !result.hasBinding(resultName);
+
+ // Get the state of the average.
+ final Map<String, AverageState> averageStates = state.getAverageStates();
+ AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName);
+
+ // Update the state of the average.
+ final Value childValue = childBindingSet.getValue(aggregatedName);
+ if(childValue instanceof Literal) {
+ final Literal childLiteral = (Literal) childValue;
+ if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+ try {
+ // Update the sum.
+ final Literal oldSum = new DecimalLiteralImpl(averageState.getSum());
+ final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue();
+
+ // Update the count.
+ final BigInteger count = averageState.getCount().add( BigInteger.ONE );
+
+ // Update the BindingSet to include the new average.
+ final Literal sumLiteral = new DecimalLiteralImpl(sum);
+ final Literal countLiteral = new IntegerLiteralImpl(count);
+ final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE);
+ result.addBinding(resultName, average);
+
+ // Update the average state that is stored.
+ averageState = new AverageState(sum, count);
+ averageStates.put(resultName, averageState);
+ } catch (final ValueExprEvaluationException e) {
+ log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet);
+ return;
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
new file mode 100644
index 0000000..8917751
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * The Sum and Count of the values that are being averaged. The average itself is derived from these values.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AverageState implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final BigDecimal sum;
+ private final BigInteger count;
+
+ /**
+ * Constructs an instance of {@link AverageState} where the count and sum start at 0.
+ */
+ public AverageState() {
+ sum = BigDecimal.ZERO;
+ count = BigInteger.ZERO;
+ }
+
+ /**
+ * Constructs an instance of {@link AverageState}.
+ *
+ * @param sum - The sum of the values that are averaged. (not null)
+ * @param count - The number of values that are averaged. (not null)
+ */
+ public AverageState(final BigDecimal sum, final BigInteger count) {
+ this.sum = requireNonNull(sum);
+ this.count = requireNonNull(count);
+ }
+
+ /**
+ * @return The sum of the values that are averaged.
+ */
+ public BigDecimal getSum() {
+ return sum;
+ }
+
+ /**
+ * @return The number of values that are averaged.
+ */
+ public BigInteger getCount() {
+ return count;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sum, count);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(o instanceof AverageState) {
+ final AverageState state = (AverageState) o;
+ return Objects.equals(sum, state.sum) &&
+ Objects.equals(count, state.count);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "Sum: " + sum + " Count: " + count;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
new file mode 100644
index 0000000..7dd5b21
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name
+ * that is being counted by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class CountFunction implements AggregationFunction {
+ @Override
+ public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+ checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
+
+ // Only add one to the count if the child contains the binding that we are counting.
+ final String aggregatedName = aggregation.getAggregatedBindingName();
+ if(childBindingSet.hasBinding(aggregatedName)) {
+ final MapBindingSet result = state.getBindingSet();
+ final String resultName = aggregation.getResultBindingName();
+ final boolean newBinding = !result.hasBinding(resultName);
+
+ if(newBinding) {
+ // Initialize the binding.
+ result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE));
+ } else {
+ // Update the existing binding.
+ final Literal count = (Literal) result.getValue(resultName);
+ final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE );
+ result.addBinding(resultName, new IntegerLiteralImpl(updatedCount));
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
new file mode 100644
index 0000000..3295fbb
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being
+ * maxed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MaxFunction implements AggregationFunction {
+
+ private final ValueComparator compare = new ValueComparator();
+
+ @Override
+ public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+ checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
+
+ // Only update the max if the child contains the binding that we are finding the max value for.
+ final String aggregatedName = aggregation.getAggregatedBindingName();
+ if(childBindingSet.hasBinding(aggregatedName)) {
+ final MapBindingSet result = state.getBindingSet();
+ final String resultName = aggregation.getResultBindingName();
+ final boolean newBinding = !result.hasBinding(resultName);
+
+ Value max;
+ if(newBinding) {
+ max = childBindingSet.getValue(aggregatedName);
+ } else {
+ final Value oldMax = result.getValue(resultName);
+ final Value childMax = childBindingSet.getValue(aggregatedName);
+ max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax;
+ }
+
+ result.addBinding(resultName, max);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
new file mode 100644
index 0000000..d6bf751
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being
+ * mined by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MinFunction implements AggregationFunction {
+
+ private final ValueComparator compare = new ValueComparator();
+
+ @Override
+ public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+ checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
+
+ // Only update the min if the child contains the binding that we are finding the min value for.
+ final String aggregatedName = aggregation.getAggregatedBindingName();
+ if(childBindingSet.hasBinding(aggregatedName)) {
+ final MapBindingSet result = state.getBindingSet();
+ final String resultName = aggregation.getResultBindingName();
+ final boolean newBinding = !result.hasBinding(resultName);
+
+ Value min;
+ if(newBinding) {
+ min = childBindingSet.getValue(aggregatedName);
+ } else {
+ final Value oldMin = result.getValue(resultName);
+ final Value chidlMin = childBindingSet.getValue(aggregatedName);
+ min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin;
+ }
+
+ result.addBinding(resultName, min);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
new file mode 100644
index 0000000..97735f2
--- /dev/null
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name
+ * that is being summed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class SumFunction implements AggregationFunction {
+ private static final Logger log = LoggerFactory.getLogger(SumFunction.class);
+
+ @Override
+ public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
+ checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
+
+ // Only add values to the sum if the child contains the binding that we are summing.
+ final String aggregatedName = aggregation.getAggregatedBindingName();
+ if(childBindingSet.hasBinding(aggregatedName)) {
+ final MapBindingSet result = state.getBindingSet();
+ final String resultName = aggregation.getResultBindingName();
+ final boolean newBinding = !result.hasBinding(resultName);
+
+ // Get the starting number for the sum.
+ Literal sum;
+ if(newBinding) {
+ sum = new IntegerLiteralImpl(BigInteger.ZERO);
+ } else {
+ sum = (Literal) state.getBindingSet().getValue(resultName);
+ }
+
+ // Add the child binding set's value if it is a numeric literal.
+ final Value childValue = childBindingSet.getValue(aggregatedName);
+ if(childValue instanceof Literal) {
+ final Literal childLiteral = (Literal) childValue;
+ if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
+ try {
+ sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS);
+ } catch (final ValueExprEvaluationException e) {
+ log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet);
+ return;
+ }
+ }
+ }
+
+ // Update the state to include the new sum.
+ result.addBinding(resultName, sum);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
index bb96a6a..4fbaad9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
@@ -18,19 +18,12 @@
*/
package org.apache.rya.indexing.pcj.fluo.app;
-import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.serialization.ValidatingObjectInputStream;
@@ -38,22 +31,21 @@ import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.function.aggregation.AggregationElement;
+import org.apache.rya.api.function.aggregation.AggregationFunction;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationType;
+import org.apache.rya.api.function.aggregation.AverageFunction;
+import org.apache.rya.api.function.aggregation.AverageState;
+import org.apache.rya.api.function.aggregation.CountFunction;
+import org.apache.rya.api.function.aggregation.MaxFunction;
+import org.apache.rya.api.function.aggregation.MinFunction;
+import org.apache.rya.api.function.aggregation.SumFunction;
import org.apache.rya.api.log.LogUtils;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Value;
-import org.openrdf.model.datatypes.XMLDatatypeUtil;
-import org.openrdf.model.impl.DecimalLiteralImpl;
-import org.openrdf.model.impl.IntegerLiteralImpl;
-import org.openrdf.query.algebra.MathExpr.MathOp;
-import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
-import org.openrdf.query.algebra.evaluation.util.MathUtil;
-import org.openrdf.query.algebra.evaluation.util.ValueComparator;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.ImmutableMap;
@@ -154,213 +146,6 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
}
/**
- * A function that updates an {@link AggregationState}.
- */
- public static interface AggregationFunction {
-
- /**
- * Updates an {@link AggregationState} based on the values of a child Binding Set.
- *
- * @param aggregation - Defines which function needs to be performed as well as any details required
- * to do the aggregation work. (not null)
- * @param state - The state that will be updated. (not null)
- * @param childBindingSet - The Binding Set whose values will be used to update the state.
- */
- public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet);
- }
-
- /**
- * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name
- * that is being counted by the {@link AggregationElement}.
- */
- public static final class CountFunction implements AggregationFunction {
- @Override
- public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
- checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
-
- // Only add one to the count if the child contains the binding that we are counting.
- final String aggregatedName = aggregation.getAggregatedBindingName();
- if(childBindingSet.hasBinding(aggregatedName)) {
- final MapBindingSet result = state.getBindingSet();
- final String resultName = aggregation.getResultBindingName();
- final boolean newBinding = !result.hasBinding(resultName);
-
- if(newBinding) {
- // Initialize the binding.
- result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE));
- } else {
- // Update the existing binding.
- final Literal count = (Literal) result.getValue(resultName);
- final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE );
- result.addBinding(resultName, new IntegerLiteralImpl(updatedCount));
- }
- }
- }
- }
-
- /**
- * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name
- * that is being summed by the {@link AggregationElement}.
- */
- public static final class SumFunction implements AggregationFunction {
- @Override
- public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
- checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
-
- // Only add values to the sum if the child contains the binding that we are summing.
- final String aggregatedName = aggregation.getAggregatedBindingName();
- if(childBindingSet.hasBinding(aggregatedName)) {
- final MapBindingSet result = state.getBindingSet();
- final String resultName = aggregation.getResultBindingName();
- final boolean newBinding = !result.hasBinding(resultName);
-
- // Get the starting number for the sum.
- Literal sum;
- if(newBinding) {
- sum = new IntegerLiteralImpl(BigInteger.ZERO);
- } else {
- sum = (Literal) state.getBindingSet().getValue(resultName);
- }
-
- // Add the child binding set's value if it is a numeric literal.
- final Value childValue = childBindingSet.getValue(aggregatedName);
- if(childValue instanceof Literal) {
- final Literal childLiteral = (Literal) childValue;
- if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
- try {
- sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS);
- } catch (final ValueExprEvaluationException e) {
- log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet);
- return;
- }
- }
- }
-
- // Update the state to include the new sum.
- result.addBinding(resultName, sum);
- }
- }
- }
-
- /**
- * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name
- * that is being averaged by the {@link AggregationElement}.
- */
- public static final class AverageFunction implements AggregationFunction {
- @Override
- public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
- checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
-
- // Only update the average if the child contains the binding that we are averaging.
- final String aggregatedName = aggregation.getAggregatedBindingName();
- if(childBindingSet.hasBinding(aggregatedName)) {
- final MapBindingSet result = state.getBindingSet();
- final String resultName = aggregation.getResultBindingName();
- final boolean newBinding = !result.hasBinding(resultName);
-
- // Get the state of the average.
- final Map<String, AverageState> averageStates = state.getAverageStates();
- AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName);
-
- // Update the state of the average.
- final Value childValue = childBindingSet.getValue(aggregatedName);
- if(childValue instanceof Literal) {
- final Literal childLiteral = (Literal) childValue;
- if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) {
- try {
- // Update the sum.
- final Literal oldSum = new DecimalLiteralImpl(averageState.getSum());
- final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue();
-
- // Update the count.
- final BigInteger count = averageState.getCount().add( BigInteger.ONE );
-
- // Update the BindingSet to include the new average.
- final Literal sumLiteral = new DecimalLiteralImpl(sum);
- final Literal countLiteral = new IntegerLiteralImpl(count);
- final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE);
- result.addBinding(resultName, average);
-
- // Update the average state that is stored.
- averageState = new AverageState(sum, count);
- averageStates.put(resultName, averageState);
- } catch (final ValueExprEvaluationException e) {
- log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet);
- return;
- }
- }
- }
- }
- }
- }
-
- /**
- * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being
- * maxed by the {@link AggregationElement}.
- */
- public static final class MaxFunction implements AggregationFunction {
-
- private final ValueComparator compare = new ValueComparator();
-
- @Override
- public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
- checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
-
- // Only update the max if the child contains the binding that we are finding the max value for.
- final String aggregatedName = aggregation.getAggregatedBindingName();
- if(childBindingSet.hasBinding(aggregatedName)) {
- final MapBindingSet result = state.getBindingSet();
- final String resultName = aggregation.getResultBindingName();
- final boolean newBinding = !result.hasBinding(resultName);
-
- Value max;
- if(newBinding) {
- max = childBindingSet.getValue(aggregatedName);
- } else {
- final Value oldMax = result.getValue(resultName);
- final Value childMax = childBindingSet.getValue(aggregatedName);
- max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax;
- }
-
- result.addBinding(resultName, max);
- }
- }
- }
-
- /**
- * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being
- * mined by the {@link AggregationElement}.
- */
- public static final class MinFunction implements AggregationFunction {
-
- private final ValueComparator compare = new ValueComparator();
-
- @Override
- public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
- checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
-
- // Only update the min if the child contains the binding that we are finding the min value for.
- final String aggregatedName = aggregation.getAggregatedBindingName();
- if(childBindingSet.hasBinding(aggregatedName)) {
- final MapBindingSet result = state.getBindingSet();
- final String resultName = aggregation.getResultBindingName();
- final boolean newBinding = !result.hasBinding(resultName);
-
- Value min;
- if(newBinding) {
- min = childBindingSet.getValue(aggregatedName);
- } else {
- final Value oldMin = result.getValue(resultName);
- final Value chidlMin = childBindingSet.getValue(aggregatedName);
- min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin;
- }
-
- result.addBinding(resultName, min);
- }
- }
- }
-
- /**
* Reads/Writes instances of {@link AggregationState} to/from bytes.
*/
public static interface AggregationStateSerDe {
@@ -410,18 +195,18 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
// System.out.println("vois.accept(" + className + ".class, ");};};
) {
// These classes are allowed to be deserialized. Others throw InvalidClassException.
- vois.accept(org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState.class, //
- org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AverageState.class, //
- java.util.HashMap.class, //
- java.math.BigInteger.class, //
- java.lang.Number.class, //
- java.math.BigDecimal.class, //
- org.openrdf.query.impl.MapBindingSet.class, //
- java.util.LinkedHashMap.class, //
- org.openrdf.query.impl.BindingImpl.class, //
- org.openrdf.model.impl.URIImpl.class, //
- org.openrdf.model.impl.LiteralImpl.class, //
- org.openrdf.model.impl.DecimalLiteralImpl.class, //
+ vois.accept(AggregationState.class,
+ AverageState.class,
+ java.util.HashMap.class,
+ java.math.BigInteger.class,
+ java.lang.Number.class,
+ java.math.BigDecimal.class,
+ org.openrdf.query.impl.MapBindingSet.class,
+ java.util.LinkedHashMap.class,
+ org.openrdf.query.impl.BindingImpl.class,
+ org.openrdf.model.impl.URIImpl.class,
+ org.openrdf.model.impl.LiteralImpl.class,
+ org.openrdf.model.impl.DecimalLiteralImpl.class,
org.openrdf.model.impl.IntegerLiteralImpl.class);
vois.accept("[B"); // Array of Bytes
final Object o = vois.readObject();
@@ -437,155 +222,4 @@ public class AggregationResultUpdater extends AbstractNodeUpdater {
return state;
}
}
-
- /**
- * Keeps track information required to update and build the resulting Binding Set for a set of Group By values.
- */
- public static final class AggregationState implements Serializable {
- private static final long serialVersionUID = 1L;
-
- // The visibility equation that encompasses all data the aggregation state is derived from.
- private String visibility;
-
- // A binding set that holds the current state of the aggregations.
- private final MapBindingSet bindingSet;
-
- // A map from result binding name to the state that derived that binding's value.
- private final Map<String, AverageState> avgStates;
-
- /**
- * Constructs an instance of {@link AggregationState}.
- */
- public AggregationState() {
- this.visibility = "";
- this.bindingSet = new MapBindingSet();
- this.avgStates = new HashMap<>();
- }
-
- /**
- * Constructs an instance of {@link AggregationState}.
- *
- * @param visibility - The visibility equation associated with the resulting binding set. (not null)
- * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of
- * Group By values. (not null)
- * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to
- * average state for that binding.
- */
- public AggregationState(
- final String visibility,
- final MapBindingSet bindingSet,
- final Map<String, AverageState> avgStates) {
- this.visibility = requireNonNull(visibility);
- this.bindingSet = requireNonNull(bindingSet);
- this.avgStates = requireNonNull(avgStates);
- }
-
- /**
- * @return The visibility equation associated with the resulting binding set.
- */
- public String getVisibility() {
- return visibility;
- }
-
- /**
- * @param visibility - The visibility equation associated with the resulting binding set.
- */
- public void setVisibility(final String visibility) {
- this.visibility = requireNonNull(visibility);
- }
-
- /**
- * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values.
- */
- public MapBindingSet getBindingSet() {
- return bindingSet;
- }
-
- /**
- * @return If the aggregation is doing an Average, this is a map from result binding name to
- * average state for that binding.
- */
- public Map<String, AverageState> getAverageStates() {
- return avgStates;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(visibility, bindingSet, avgStates);
- }
-
- @Override
- public boolean equals(final Object o) {
- if(o instanceof AggregationState) {
- final AggregationState state = (AggregationState) o;
- return Objects.equals(visibility, state.visibility) &&
- Objects.equals(bindingSet, state.bindingSet) &&
- Objects.equals(avgStates, state.avgStates);
- }
- return false;
- }
- }
-
- /**
- * The Sum and Count of the values that are being averaged. The average itself is derived from these values.
- */
- public static class AverageState implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final BigDecimal sum;
- private final BigInteger count;
-
- /**
- * Constructs an instance of {@link AverageState} where the count and sum start at 0.
- */
- public AverageState() {
- sum = BigDecimal.ZERO;
- count = BigInteger.ZERO;
- }
-
- /**
- * Constructs an instance of {@link AverageState}.
- *
- * @param sum - The sum of the values that are averaged. (not null)
- * @param count - The number of values that are averaged. (not null)
- */
- public AverageState(final BigDecimal sum, final BigInteger count) {
- this.sum = requireNonNull(sum);
- this.count = requireNonNull(count);
- }
-
- /**
- * @return The sum of the values that are averaged.
- */
- public BigDecimal getSum() {
- return sum;
- }
-
- /**
- * @return The number of values that are averaged.
- */
- public BigInteger getCount() {
- return count;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(sum, count);
- }
-
- @Override
- public boolean equals(final Object o) {
- if(o instanceof AverageState) {
- final AverageState state = (AverageState) o;
- return Objects.equals(sum, state.sum) &&
- Objects.equals(count, state.count);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "Sum: " + sum + " Count: " + count;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 0271519..e806b15 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -23,10 +23,10 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AG
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
-import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
+import org.apache.rya.api.function.aggregation.AggregationState;
+import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationStateSerDe;
import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSerializationAggregationStateSerDe;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index eaa072f..a839645 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -20,23 +20,14 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static java.util.Objects.requireNonNull;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
+import org.apache.rya.api.function.aggregation.AggregationElement;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.openrdf.query.algebra.AggregateOperator;
-import org.openrdf.query.algebra.Avg;
-import org.openrdf.query.algebra.Count;
-import org.openrdf.query.algebra.Max;
-import org.openrdf.query.algebra.Min;
-import org.openrdf.query.algebra.Sum;
-
-import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -50,115 +41,6 @@ import net.jcip.annotations.Immutable;
@DefaultAnnotation(NonNull.class)
public class AggregationMetadata extends CommonNodeMetadata {
- /**
- * The different types of Aggregation functions that an aggregate node may perform.
- */
- public static enum AggregationType {
- MIN(Min.class),
- MAX(Max.class),
- COUNT(Count.class),
- SUM(Sum.class),
- AVERAGE(Avg.class);
-
- private final Class<? extends AggregateOperator> operatorClass;
-
- private AggregationType(final Class<? extends AggregateOperator> operatorClass) {
- this.operatorClass = requireNonNull(operatorClass);
- }
-
- private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass;
- static {
- final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder();
- for(final AggregationType type : AggregationType.values()) {
- builder.put(type.operatorClass, type);
- }
- byOperatorClass = builder.build();
- }
-
- public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) {
- return Optional.ofNullable( byOperatorClass.get(operatorClass) );
- }
- }
-
- /**
- * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query.
- * </p>
- * For example, if you have the following in SPARQL:
- * <pre>
- * SELECT (avg(?price) as ?avgPrice) {
- * ...
- * }
- * </pre>
- * You would construct an instance of this object like so:
- * <pre>
- * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice");
- * </pre>
- */
- @Immutable
- @DefaultAnnotation(NonNull.class)
- public static final class AggregationElement implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final AggregationType aggregationType;
- private final String aggregatedBindingName;
- private final String resultBindingName;
-
- /**
- * Constructs an instance of {@link AggregationElement}.
- *
- * @param aggregationType - Defines how the binding values will be aggregated. (not null)
- * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must
- * appear within the child node's emitted binding sets. (not null)
- * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding
- * must appeared within the AggregationMetadata's variable order. (not null)
- */
- public AggregationElement(
- final AggregationType aggregationType,
- final String aggregatedBindingName,
- final String resultBindingName) {
- this.aggregationType = requireNonNull(aggregationType);
- this.aggregatedBindingName = requireNonNull(aggregatedBindingName);
- this.resultBindingName = requireNonNull(resultBindingName);
- }
-
- /**
- * @return Defines how the binding values will be aggregated.
- */
- public AggregationType getAggregationType() {
- return aggregationType;
- }
-
- /**
- * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets.
- */
- public String getAggregatedBindingName() {
- return aggregatedBindingName;
- }
-
- /**
- * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order.
- */
- public String getResultBindingName() {
- return resultBindingName;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName);
- }
-
- @Override
- public boolean equals(final Object o ) {
- if(o instanceof AggregationElement) {
- final AggregationElement agg = (AggregationElement) o;
- return Objects.equals(aggregationType, agg.aggregationType) &&
- Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) &&
- Objects.equals(resultBindingName, agg.resultBindingName);
- }
- return false;
- }
- }
-
private final String parentNodeId;
private final String childNodeId;
private final Collection<AggregationElement> aggregations;
@@ -308,6 +190,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
/**
* @return This node's Node ID.
*/
+ @Override
public String getNodeId() {
return nodeId;
}
@@ -321,10 +204,11 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.varOrder = varOrder;
return this;
}
-
+
/**
* @return the variable order of binding sets that are emitted by this node.
*/
+ @Override
public VariableOrder getVariableOrder() {
return varOrder;
}
@@ -337,7 +221,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.parentNodeId = parentNodeId;
return this;
}
-
+
public String getParentNodeId() {
return parentNodeId;
}
@@ -350,7 +234,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.childNodeId = childNodeId;
return this;
}
-
+
public String getChildNodeId() {
return childNodeId;
}
@@ -375,7 +259,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.groupByVariables = groupByVariables;
return this;
}
-
+
/**
* @return variable order that defines how data is grouped for the aggregation function
*/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 9d1c4fc..e043671 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -25,8 +25,8 @@ import java.util.List;
import org.apache.fluo.api.data.Column;
import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.function.aggregation.AggregationState;
import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index ba75a56..c193ef7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -37,11 +37,12 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.function.aggregation.AggregationElement;
+import org.apache.rya.api.function.aggregation.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -567,10 +568,10 @@ public class FluoQueryMetadataDAO {
// System.out.println("vois.accept(" + className + ".class, ");};};
) {
// These classes are allowed to be deserialized. Others throw InvalidClassException.
- vois.accept(java.util.ArrayList.class, //
- java.lang.Enum.class, //
- org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement.class, //
- org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType.class);
+ vois.accept(java.util.ArrayList.class,
+ java.lang.Enum.class,
+ AggregationElement.class,
+ AggregationType.class);
final Object object = vois.readObject();
if (!(object instanceof Collection<?>)) {
throw new InvalidClassException("Object read was not of type Collection. It was: " + object.getClass());