You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:32 UTC
[31/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
new file mode 100644
index 0000000..4c21a04
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Base class for Aggregator implementations
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseAggregator extends BaseTerminalExpression implements Aggregator {
+
+ protected final ColumnModifier columnModifier;
+
+ public BaseAggregator(ColumnModifier columnModifier) {
+ this.columnModifier = columnModifier;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return true;
+ }
+
+ @Override
+ public int getSize() {
+ return SizedUtil.OBJECT_SIZE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
new file mode 100644
index 0000000..ff12a1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.*;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.BigDecimalUtil.Operation;
+
+/**
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseDecimalStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+ private BigDecimal cachedResult = null;
+ private int colPrecision;
+ private int colScale;
+
+ public BaseDecimalStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ ColumnExpression stdDevColExp = (ColumnExpression)exps.get(0);
+ this.colPrecision = stdDevColExp.getMaxLength();
+ this.colScale = stdDevColExp.getScale();
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ BigDecimal ssd = sumSquaredDeviation();
+ ssd = ssd.divide(new BigDecimal(getDataPointsCount()), PDataType.DEFAULT_MATH_CONTEXT);
+ // Calculate the precision for the stddev result.
+ // There are totalCount #Decimal values for which we are calculating the stddev
+ // The resultant precision depends on precision and scale of all these values. (See
+ // BigDecimalUtil.getResultPrecisionScale)
+ // As of now we are not using the actual precision and scale of individual values but just using the table
+ // column's max length(precision) and scale for each of the values.
+ int resultPrecision = colPrecision;
+ for (int i = 1; i < this.totalCount; i++) {
+ // Max precision that we can support is 38 See PDataType.MAX_PRECISION
+ if (resultPrecision >= PDataType.MAX_PRECISION) break;
+ Pair<Integer, Integer> precisionScale = BigDecimalUtil.getResultPrecisionScale(this.colPrecision,
+ this.colScale, this.colPrecision, this.colScale, Operation.OTHERS);
+ resultPrecision = precisionScale.getFirst();
+ }
+ cachedResult = new BigDecimal(Math.sqrt(ssd.doubleValue()), new MathContext(resultPrecision,
+ RoundingMode.HALF_UP));
+ cachedResult.setScale(this.colScale, RoundingMode.HALF_UP);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ protected abstract long getDataPointsCount();
+
+ private BigDecimal sumSquaredDeviation() {
+ BigDecimal m = mean();
+ BigDecimal result = BigDecimal.ZERO;
+ for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+ BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+ BigDecimal delta = colValue.subtract(m);
+ result = result.add(delta.multiply(delta).multiply(new BigDecimal(entry.getValue())));
+ }
+ return result;
+ }
+
+ private BigDecimal mean() {
+ BigDecimal sum = BigDecimal.ZERO;
+ for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+ BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+ sum = sum.add(colValue.multiply(new BigDecimal(entry.getValue())));
+ }
+ return sum.divide(new BigDecimal(totalCount), PDataType.DEFAULT_MATH_CONTEXT);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
new file mode 100644
index 0000000..89f6fca
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+ protected Expression stdDevColExp;
+ private BigDecimal cachedResult = null;
+
+ public BaseStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.stdDevColExp = exps.get(0);
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ double ssd = sumSquaredDeviation();
+ double result = Math.sqrt(ssd / getDataPointsCount());
+ cachedResult = new BigDecimal(result);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ protected abstract long getDataPointsCount();
+
+ private double sumSquaredDeviation() {
+ double m = mean();
+ double result = 0.0;
+ for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+ double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+ double delta = colValue - m;
+ result += (delta * delta) * entry.getValue();
+ }
+ return result;
+ }
+
+ private double mean() {
+ double sum = 0.0;
+ for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+ double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+ sum += colValue * entry.getValue();
+ }
+ return sum / totalCount;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java b/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
new file mode 100644
index 0000000..0ac5baf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ *
+ * Aggregators that execute on the client-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ClientAggregators extends Aggregators {
+ private final ValueBitSet tempValueSet;
+
+ private static Aggregator[] getAggregators(List<SingleAggregateFunction> aggFuncs) {
+ Aggregator[] aggregators = new Aggregator[aggFuncs.size()];
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i] = aggFuncs.get(i).getAggregator();
+ }
+ return aggregators;
+ }
+
+ public ClientAggregators(List<SingleAggregateFunction> functions, int minNullableIndex) {
+ super(functions.toArray(new SingleAggregateFunction[functions.size()]), getAggregators(functions), minNullableIndex);
+ this.tempValueSet = ValueBitSet.newInstance(schema);
+ }
+
+ @Override
+ public void aggregate(Aggregator[] aggregators, Tuple result) {
+ TupleUtil.getAggregateValue(result, ptr);
+ tempValueSet.clear();
+ tempValueSet.or(ptr);
+
+ int i = 0, maxOffset = ptr.getOffset() + ptr.getLength();
+ Boolean hasValue;
+ schema.iterator(ptr);
+ while ((hasValue=schema.next(ptr, i, maxOffset, tempValueSet)) != null) {
+ if (hasValue) {
+ aggregators[i].aggregate(result, ptr);
+ }
+ i++;
+ }
+ }
+
+ @Override
+ public Aggregator[] newAggregators() {
+ Aggregator[] aggregators = new Aggregator[functions.length];
+ for (int i = 0; i < functions.length; i++) {
+ aggregators[i] = functions[i].newClientAggregator();
+ }
+ return aggregators;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
new file mode 100644
index 0000000..47494ee
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ *
+ * Aggregator for COUNT aggregations
+ * @author jtaylor
+ * @since 0.1
+ */
+public class CountAggregator extends BaseAggregator {
+ private long count = 0;
+ private byte[] buffer = null;
+
+ public CountAggregator() {
+ super(null);
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ count++;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ buffer = new byte[getDataType().getByteSize()];
+ }
+ getDataType().getCodec().encodeLong(count, buffer, 0);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.LONG;
+ }
+
+ @Override
+ public void reset() {
+ count = 0;
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "COUNT [count=" + count + "]";
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
new file mode 100644
index 0000000..c707057
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations for DECIMAL data type.
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevPopAggregator extends BaseDecimalStddevAggregator {
+
+ public DecimalStddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
new file mode 100644
index 0000000..0aa1928
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations for DECIMAL data type.
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevSampAggregator extends BaseDecimalStddevAggregator {
+
+ public DecimalStddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
new file mode 100644
index 0000000..bd15969
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ *
+ * Aggregator that sums BigDecimal values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class DecimalSumAggregator extends BaseAggregator {
+ private BigDecimal sum = BigDecimal.ZERO;
+ private byte[] sumBuffer;
+
+ public DecimalSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ private PDataType getInputDataType() {
+ return PDataType.DECIMAL;
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ BigDecimal value = (BigDecimal)getDataType().toObject(ptr, getInputDataType(), columnModifier);
+ sum = sum.add(value);
+ if (sumBuffer == null) {
+ sumBuffer = new byte[getDataType().getByteSize()];
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (sumBuffer == null) {
+ return false;
+ }
+ int len = getDataType().toBytes(sum, sumBuffer, 0);
+ ptr.set(sumBuffer, 0, len);
+ return true;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.DECIMAL;
+ }
+
+ @Override
+ public void reset() {
+ sum = BigDecimal.ZERO;
+ sumBuffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "DECIMAL SUM [sum=" + sum + "]";
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.BIG_DECIMAL_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
new file mode 100644
index 0000000..d221e91
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for DISTINCT COUNT aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctCountClientAggregator extends DistinctValueWithCountClientAggregator {
+
+ public DistinctCountClientAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ initBuffer();
+ }
+ long value = this.valueVsCount.size();
+ buffer = PDataType.LONG.toBytes(value);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.LONG.getByteSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
new file mode 100644
index 0000000..0032b6b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class DistinctValueWithCountClientAggregator extends BaseAggregator {
+ protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+ protected byte[] buffer;
+ protected long totalCount = 0L;
+
+ public DistinctValueWithCountClientAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ InputStream is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1);
+ try {
+ if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, DistinctValueWithCountServerAggregator.COMPRESS_MARKER, 0,
+ 1)) {
+ InputStream decompressionStream = DistinctValueWithCountServerAggregator.COMPRESS_ALGO
+ .createDecompressionStream(is,
+ DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0);
+ is = decompressionStream;
+ }
+ DataInputStream in = new DataInputStream(is);
+ int mapSize = WritableUtils.readVInt(in);
+ for (int i = 0; i < mapSize; i++) {
+ int keyLen = WritableUtils.readVInt(in);
+ byte[] keyBytes = new byte[keyLen];
+ in.read(keyBytes, 0, keyLen);
+ ImmutableBytesPtr key = new ImmutableBytesPtr(keyBytes);
+ int value = WritableUtils.readVInt(in);
+ Integer curCount = valueVsCount.get(key);
+ if (curCount == null) {
+ valueVsCount.put(key, value);
+ } else {
+ valueVsCount.put(key, curCount + value);
+ }
+ totalCount += value;
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe); // Impossible as we're using a ByteArrayInputStream
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ }
+
+ protected abstract int getBufferLength();
+
+ protected void initBuffer() {
+ buffer = new byte[getBufferLength()];
+ }
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.VARBINARY;
+ }
+
+ @Override
+ public void reset() {
+ valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+ buffer = null;
+ totalCount = 0L;
+ super.reset();
+ }
+
+ protected Map<Object, Integer> getSortedValueVsCount(final boolean ascending, final PDataType type) {
+ // To sort the valueVsCount
+ Comparator<Object> comparator = new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ if (ascending) {
+ return type.compareTo(o1, o2);
+ }
+ return type.compareTo(o2, o1);
+ }
+ };
+ Map<Object, Integer> sorted = new TreeMap<Object, Integer>(comparator);
+ for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+ sorted.put(type.toObject(entry.getKey(), columnModifier), entry.getValue());
+ }
+ return sorted;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
new file mode 100644
index 0000000..16b697e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+
+/**
+ * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctValueWithCountServerAggregator extends BaseAggregator {
+ private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
+ public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
+ public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
+ public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
+
+ private int compressThreshold;
+ private byte[] buffer = null;
+ private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+
+ public DistinctValueWithCountServerAggregator(Configuration conf) {
+ super(null);
+ compressThreshold = conf.getInt(QueryServices.DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB,
+ QueryServicesOptions.DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD);
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ ImmutableBytesPtr key = new ImmutableBytesPtr(ptr.get(), ptr.getOffset(), ptr.getLength());
+ Integer count = this.valueVsCount.get(key);
+ if (count == null) {
+ this.valueVsCount.put(key, 1);
+ } else {
+ this.valueVsCount.put(key, ++count);
+ }
+ }
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // This serializes the Map. The format is as follows
+ // Map size(VInt ie. 1 to 5 bytes) +
+ // ( key length [VInt ie. 1 to 5 bytes] + key bytes + value [VInt ie. 1 to 5 bytes] )*
+ int serializationSize = countMapSerializationSize();
+ buffer = new byte[serializationSize];
+ int offset = 1;
+ offset += ByteUtil.vintToBytes(buffer, offset, this.valueVsCount.size());
+ for (Entry<ImmutableBytesPtr, Integer> entry : this.valueVsCount.entrySet()) {
+ ImmutableBytesPtr key = entry.getKey();
+ offset += ByteUtil.vintToBytes(buffer, offset, key.getLength());
+ System.arraycopy(key.get(), key.getOffset(), buffer, offset, key.getLength());
+ offset += key.getLength();
+ offset += ByteUtil.vintToBytes(buffer, offset, entry.getValue().intValue());
+ }
+ if (serializationSize > compressThreshold) {
+ // The size for the map serialization is above the threshold. We will do the Snappy compression here.
+ ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream();
+ try {
+ compressedByteStream.write(COMPRESS_MARKER);
+ OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream,
+ COMPRESS_ALGO.getCompressor(), 0);
+ compressionStream.write(buffer, 1, buffer.length - 1);
+ compressionStream.flush();
+ ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size());
+ return true;
+ } catch (Exception e) {
+ LOG.error("Exception while Snappy compression of data.", e);
+ }
+ }
+ ptr.set(buffer, 0, offset);
+ return true;
+ }
+
+ // The #bytes required to serialize the count map.
+ // Here let us assume to use 4 bytes for each of the int items. Normally it will consume lesser
+ // bytes as we will use vints.
+ // TODO Do we need to consider 5 as the number of bytes for each of the int field? Else there is
+ // a chance of ArrayIndexOutOfBoundsException when all the int fields are having very large
+ // values. Will that ever occur?
+ private int countMapSerializationSize() {
+ int size = Bytes.SIZEOF_INT;// Write the number of entries in the Map
+ for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+ // Add up the key and key's lengths (Int) and the value
+ size += key.getLength() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+ }
+ return size;
+ }
+
+ // The heap size which will be taken by the count map.
+ private int countMapHeapSize() {
+ int size = 0;
+ if (this.valueVsCount.size() > 0) {
+ for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+ size += SizedUtil.MAP_ENTRY_SIZE + // entry
+ Bytes.SIZEOF_INT + // key size
+ key.getLength() + SizedUtil.ARRAY_SIZE; // value size
+ }
+ } else {
+ // Initially when the getSize() is called, we dont have any entries in the map so as to
+ // tell the exact heap need. Let us approximate the #entries
+ SizedUtil.sizeOfMap(DEFAULT_ESTIMATED_DISTINCT_VALUES,
+ SizedUtil.IMMUTABLE_BYTES_PTR_SIZE, Bytes.SIZEOF_INT);
+ }
+ return size;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.VARBINARY;
+ }
+
+ @Override
+ public void reset() {
+ valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "DISTINCT VALUE vs COUNT";
+ }
+
+ @Override
+ public int getSize() {
+ // TODO make this size correct.??
+ // This size is being called initially at the begin of the scanner open. At that time we any
+ // way can not tell the exact size of the Map. The Aggregators get size from all Aggregator
+ // and stores in a variable for future use. This size of the Aggregators is being used in
+ // Grouped unordered scan. Do we need some changes there in that calculation?
+ return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
new file mode 100644
index 0000000..883a1ba
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+public class DoubleSumAggregator extends BaseAggregator {
+
+ private double sum = 0;
+ private byte[] buffer;
+
+ public DoubleSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ protected PDataType getInputDataType() {
+ return PDataType.DOUBLE;
+ }
+
+ private void initBuffer() {
+ buffer = new byte[getDataType().getByteSize()];
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ double value = getInputDataType().getCodec().decodeDouble(ptr, columnModifier);
+ sum += value;
+ if (buffer == null) {
+ initBuffer();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ if (isNullable()) {
+ return false;
+ }
+ initBuffer();
+ }
+ ptr.set(buffer);
+ getDataType().getCodec().encodeDouble(sum, ptr);
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.DOUBLE;
+ }
+
+ @Override
+ public String toString() {
+ return "SUM [sum=" + sum + "]";
+ }
+
+ @Override
+ public void reset() {
+ sum = 0;
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
new file mode 100644
index 0000000..e96a993
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums integer values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class IntSumAggregator extends NumberSumAggregator {
+
+ public IntSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.INTEGER;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
new file mode 100644
index 0000000..bfdadc9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums long values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class LongSumAggregator extends NumberSumAggregator {
+
+ public LongSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.LONG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
new file mode 100644
index 0000000..890e14a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+/**
+ * Aggregator that finds the max of values. Inverse of {@link MinAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MaxAggregator extends MinAggregator {
+
+ public MaxAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+ return !super.keepFirst(ibw1, ibw2);
+ }
+
+ @Override
+ public String toString() {
+ return "MAX [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
new file mode 100644
index 0000000..8954de2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * Aggregator that finds the min of values. Inverse of {@link MaxAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MinAggregator extends BaseAggregator {
+ /** Used to store the accumulate the results of the MIN function */
+ protected final ImmutableBytesWritable value = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+
+ public MinAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ public void reset() {
+ value.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ super.reset();
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + /*value*/ SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE;
+ }
+
+ /**
+ * Compares two bytes writables, and returns true if the first one should be
+ * kept, and false otherwise. For the MIN function, this method will return
+ * true if the first bytes writable is less than the second.
+ *
+ * @param ibw1 the first bytes writable
+ * @param ibw2 the second bytes writable
+ * @return true if the first bytes writable should be kept
+ */
+ protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+ return 0 >= getDataType().compareTo(ibw1, columnModifier, ibw2, columnModifier, getDataType());
+ }
+
+ private boolean isNull() {
+ return value.get() == ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (isNull()) {
+ value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+ } else {
+ if (!keepFirst(value, ptr)) {
+ // replace the value with the new value
+ value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MIN [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (isNull()) {
+ return false;
+ }
+ ptr.set(value.get(), value.getOffset(), value.getLength());
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
new file mode 100644
index 0000000..fcfef58
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ *
+ * Aggregator that sums integral number values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class NumberSumAggregator extends BaseAggregator {
+ private long sum = 0;
+ private byte[] buffer;
+
+ public NumberSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ abstract protected PDataType getInputDataType();
+
+ private int getBufferLength() {
+ return getDataType().getByteSize();
+ }
+
+ private void initBuffer() {
+ buffer = new byte[getBufferLength()];
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // Get either IntNative or LongNative depending on input type
+ long value = getInputDataType().getCodec().decodeLong(ptr, columnModifier);
+ sum += value;
+ if (buffer == null) {
+ initBuffer();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ if (isNullable()) {
+ return false;
+ }
+ initBuffer();
+ }
+ ptr.set(buffer);
+ getDataType().getCodec().encodeLong(sum, ptr);
+ return true;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.LONG;
+ }
+
+ @Override
+ public void reset() {
+ sum = 0;
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "SUM [sum=" + sum + "]";
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getBufferLength();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
new file mode 100644
index 0000000..42ca267
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENT_RANK aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentRankClientAggregator extends DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private BigDecimal cachedResult = null;
+
+ public PercentRankClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+ // be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+ boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression valueExp = (LiteralExpression)exps.get(2);
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ long distinctCountsSum = 0;
+ Object value = valueExp.getValue();
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ Object colValue = entry.getKey();
+ int compareResult = columnExp.getDataType().compareTo(colValue, value, valueExp.getDataType());
+ boolean done = isAscending ? compareResult > 0 : compareResult <= 0;
+ if (done) break;
+ distinctCountsSum += entry.getValue();
+ }
+
+ float result = (float)distinctCountsSum / totalCount;
+ this.cachedResult = new BigDecimal(result);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
new file mode 100644
index 0000000..095842a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENTILE_CONT aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentileClientAggregator extends DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private BigDecimal cachedResult = null;
+
+ public PercentileClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+ // be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+ boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression percentileExp = (LiteralExpression)exps.get(2);
+ float p = ((Number)percentileExp.getValue()).floatValue();
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ float i = (p * this.totalCount) + 0.5F;
+ long k = (long)i;
+ float f = i - k;
+ Object o1 = null;
+ Object o2 = null;
+ long distinctCountsSum = 0;
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ if (o1 != null) {
+ o2 = entry.getKey();
+ break;
+ }
+ distinctCountsSum += entry.getValue();
+ if (distinctCountsSum == k) {
+ o1 = entry.getKey();
+ } else if (distinctCountsSum > k) {
+ o1 = o2 = entry.getKey();
+ break;
+ }
+ }
+
+ double result = 0.0;
+ Number n1 = (Number)o1;
+ if (o2 == null || o1 == o2) {
+ result = n1.doubleValue();
+ } else {
+ Number n2 = (Number)o2;
+ result = (n1.doubleValue() * (1.0F - f)) + (n2.doubleValue() * f);
+ }
+ this.cachedResult = new BigDecimal(result);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
new file mode 100644
index 0000000..206ed1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Built-in function for PERCENTILE_DISC(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate function
+ *
+ * @author ramkrishna
+ * @since 1.2.1
+ */
+public class PercentileDiscClientAggregator extends
+ DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private Object cachedResult = null;
+ ColumnExpression columnExp = null;
+
+ public PercentileDiscClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // Reset buffer so that it gets initialized with the current datatype of the column
+ buffer = null;
+ if (cachedResult == null) {
+ columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating
+ // whether the ordering to be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression) exps
+ .get(1);
+ boolean isAscending = (Boolean) isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression percentileExp = (LiteralExpression) exps.get(2);
+ float p = ((Number) percentileExp.getValue()).floatValue();
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ int currValue = 0;
+ Object result = null;
+ // Here the Percentile_disc returns the cum_dist() that is greater or equal to the
+ // Percentile (p) specified in the query. So the result set will be of that of the
+ // datatype of the column being selected
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ result = entry.getKey();
+ Integer value = entry.getValue();
+ currValue += value;
+ float cum_dist = (float) currValue / (float) totalCount;
+ if (cum_dist >= p) {
+ break;
+ }
+ }
+ this.cachedResult = result;
+ }
+ if (buffer == null) {
+ // Initialize based on the datatype
+ // columnExp cannot be null
+ buffer = new byte[columnExp.getDataType().getByteSize()];
+ }
+ // Copy the result to the buffer.
+ System.arraycopy(columnExp.getDataType().toBytes(this.cachedResult), 0, buffer, 0, buffer.length);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ // Will be used in the aggregate() call
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
new file mode 100644
index 0000000..6457793
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Aggregators that execute on the server-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerAggregators extends Aggregators {
+ public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0);
+ private final Expression[] expressions;
+
+ private ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
+ super(functions, aggregators, minNullableIndex);
+ if (aggregators.length != expressions.length) {
+ throw new IllegalArgumentException("Number of aggregators (" + aggregators.length
+ + ") must match the number of expressions (" + Arrays.toString(expressions) + ")");
+ }
+ this.expressions = expressions;
+ }
+
+ @Override
+ public void aggregate(Aggregator[] aggregators, Tuple result) {
+ for (int i = 0; i < expressions.length; i++) {
+ if (expressions[i].evaluate(result, ptr)) {
+ aggregators[i].aggregate(result, ptr);
+ }
+ }
+ }
+
+ /**
+ * Serialize an Aggregator into a byte array
+ * @param aggFuncs list of aggregator to serialize
+ * @return serialized byte array respresentation of aggregator
+ */
+ public static byte[] serialize(List<SingleAggregateFunction> aggFuncs, int minNullableIndex) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, minNullableIndex);
+ WritableUtils.writeVInt(output, aggFuncs.size());
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(aggFunc).ordinal());
+ aggFunc.write(output);
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Aggregator[] newAggregators() {
+ return newAggregators(null);
+ }
+
+ public Aggregator[] newAggregators(Configuration conf) {
+ Aggregator[] aggregators = new Aggregator[functions.length];
+ for (int i = 0; i < functions.length; i++) {
+ aggregators[i] = functions[i].newServerAggregator(conf);
+ }
+ return aggregators;
+ }
+
+ /**
+ * Deserialize aggregators from the serialized byte array representation
+ * @param b byte array representation of a list of Aggregators
+ * @param conf Server side configuration used by HBase
+ * @return newly instantiated Aggregators instance
+ */
+ public static ServerAggregators deserialize(byte[] b, Configuration conf) {
+ if (b == null) {
+ return ServerAggregators.EMPTY_AGGREGATORS;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int minNullableIndex = WritableUtils.readVInt(input);
+ int len = WritableUtils.readVInt(input);
+ Aggregator[] aggregators = new Aggregator[len];
+ Expression[] expressions = new Expression[len];
+ SingleAggregateFunction[] functions = new SingleAggregateFunction[len];
+ for (int i = 0; i < aggregators.length; i++) {
+ SingleAggregateFunction aggFunc = (SingleAggregateFunction)ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ aggFunc.readFields(input, conf);
+ functions[i] = aggFunc;
+ aggregators[i] = aggFunc.getAggregator();
+ expressions[i] = aggFunc.getAggregatorExpression();
+ }
+ return new ServerAggregators(functions, aggregators,expressions, minNullableIndex);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
new file mode 100644
index 0000000..30276e1
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevPopAggregator extends BaseStddevAggregator {
+
+ public StddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
new file mode 100644
index 0000000..49f52d2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevSampAggregator extends BaseStddevAggregator {
+
+ public StddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
new file mode 100644
index 0000000..c8befca
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums unsigned integer values
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedIntSumAggregator extends NumberSumAggregator {
+
+ public UnsignedIntSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.UNSIGNED_INT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
new file mode 100644
index 0000000..b91a934
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums unsigned long values
+ * TODO: create these classes dynamically based on the type passed through
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedLongSumAggregator extends NumberSumAggregator {
+
+ public UnsignedLongSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.UNSIGNED_LONG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java b/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
new file mode 100644
index 0000000..8c9ef5b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+
+
+
+
+/**
+ *
+ * Compiled representation of a built-in aggregate function
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class AggregateFunction extends FunctionExpression {
+
+ public AggregateFunction() {
+ }
+
+ public AggregateFunction(List<Expression> children) {
+ super(children);
+ }
+}