You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:32 UTC

[31/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
new file mode 100644
index 0000000..4c21a04
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Base class for Aggregator implementations
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseAggregator extends BaseTerminalExpression implements Aggregator {
+    
+    protected final ColumnModifier columnModifier;    
+    
+    public BaseAggregator(ColumnModifier columnModifier) {
+        this.columnModifier = columnModifier;
+    }
+    
+    @Override
+    public boolean isNullable() {
+        return true;
+    }
+    
+    @Override
+    public int getSize() {
+        return SizedUtil.OBJECT_SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
new file mode 100644
index 0000000..ff12a1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.*;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.BigDecimalUtil.Operation;
+
+/**
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseDecimalStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+    private BigDecimal cachedResult = null;
+    private int colPrecision;
+    private int colScale;
+
+    public BaseDecimalStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        ColumnExpression stdDevColExp = (ColumnExpression)exps.get(0);
+        this.colPrecision = stdDevColExp.getMaxLength();
+        this.colScale = stdDevColExp.getScale();
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            BigDecimal ssd = sumSquaredDeviation();
+            ssd = ssd.divide(new BigDecimal(getDataPointsCount()), PDataType.DEFAULT_MATH_CONTEXT);
+            // Calculate the precision for the stddev result.
+            // There are totalCount #Decimal values for which we are calculating the stddev
+            // The resultant precision depends on precision and scale of all these values. (See
+            // BigDecimalUtil.getResultPrecisionScale)
+            // As of now we are not using the actual precision and scale of individual values but just using the table
+            // column's max length(precision) and scale for each of the values.
+            int resultPrecision = colPrecision;
+            for (int i = 1; i < this.totalCount; i++) {
+                // Max precision that we can support is 38 See PDataType.MAX_PRECISION
+                if (resultPrecision >= PDataType.MAX_PRECISION) break;
+                Pair<Integer, Integer> precisionScale = BigDecimalUtil.getResultPrecisionScale(this.colPrecision,
+                        this.colScale, this.colPrecision, this.colScale, Operation.OTHERS);
+                resultPrecision = precisionScale.getFirst();
+            }
+            cachedResult = new BigDecimal(Math.sqrt(ssd.doubleValue()), new MathContext(resultPrecision,
+                    RoundingMode.HALF_UP));
+            cachedResult.setScale(this.colScale, RoundingMode.HALF_UP);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    protected abstract long getDataPointsCount();
+
+    private BigDecimal sumSquaredDeviation() {
+        BigDecimal m = mean();
+        BigDecimal result = BigDecimal.ZERO;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+            BigDecimal delta = colValue.subtract(m);
+            result = result.add(delta.multiply(delta).multiply(new BigDecimal(entry.getValue())));
+        }
+        return result;
+    }
+
+    private BigDecimal mean() {
+        BigDecimal sum = BigDecimal.ZERO;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+            sum = sum.add(colValue.multiply(new BigDecimal(entry.getValue())));
+        }
+        return sum.divide(new BigDecimal(totalCount), PDataType.DEFAULT_MATH_CONTEXT);
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
new file mode 100644
index 0000000..89f6fca
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+    protected Expression stdDevColExp;
+    private BigDecimal cachedResult = null;
+
+    public BaseStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.stdDevColExp = exps.get(0);
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            double ssd = sumSquaredDeviation();
+            double result = Math.sqrt(ssd / getDataPointsCount());
+            cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+    
+    protected abstract long getDataPointsCount();
+    
+    private double sumSquaredDeviation() {
+        double m = mean();
+        double result = 0.0;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+            double delta = colValue - m;
+            result += (delta * delta) * entry.getValue();
+        }
+        return result;
+    }
+
+    private double mean() {
+        double sum = 0.0;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+            sum += colValue * entry.getValue();
+        }
+        return sum / totalCount;
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java b/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
new file mode 100644
index 0000000..0ac5baf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ * 
+ * Aggregators that execute on the client-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ClientAggregators extends Aggregators {
+    private final ValueBitSet tempValueSet; 
+  
+    private static Aggregator[] getAggregators(List<SingleAggregateFunction> aggFuncs) {
+        Aggregator[] aggregators = new Aggregator[aggFuncs.size()];
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i] = aggFuncs.get(i).getAggregator();
+        }
+        return aggregators;
+    }
+    
+    public ClientAggregators(List<SingleAggregateFunction> functions, int minNullableIndex) {
+        super(functions.toArray(new SingleAggregateFunction[functions.size()]), getAggregators(functions), minNullableIndex);
+        this.tempValueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        TupleUtil.getAggregateValue(result, ptr);
+        tempValueSet.clear();
+        tempValueSet.or(ptr);
+
+        int i = 0, maxOffset = ptr.getOffset() + ptr.getLength();
+        Boolean hasValue;
+        schema.iterator(ptr);
+        while ((hasValue=schema.next(ptr, i, maxOffset, tempValueSet)) != null) {
+            if (hasValue) {
+                aggregators[i].aggregate(result, ptr);
+            }
+            i++;
+        }
+    }
+    
+    @Override
+    public Aggregator[] newAggregators() {
+        Aggregator[] aggregators = new Aggregator[functions.length];
+        for (int i = 0; i < functions.length; i++) {
+            aggregators[i] = functions[i].newClientAggregator();
+        }
+        return aggregators;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
new file mode 100644
index 0000000..47494ee
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * 
+ * Aggregator for COUNT aggregations
+ * @author jtaylor
+ * @since 0.1
+ */
+public class CountAggregator extends BaseAggregator {
+    private long count = 0;
+    private byte[] buffer = null;
+    
+    public CountAggregator() {
+        super(null);
+    }
+    
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        count++;
+    }
+    
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            buffer = new byte[getDataType().getByteSize()];
+        }
+        getDataType().getCodec().encodeLong(count, buffer, 0);
+        ptr.set(buffer);
+        return true;
+    }
+    
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.LONG;
+    }
+
+    @Override
+    public void reset() {
+        count = 0;
+        buffer = null;
+        super.reset();
+    }
+    
+    @Override
+    public String toString() {
+        return "COUNT [count=" + count + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
new file mode 100644
index 0000000..c707057
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations for DECIMAL data type.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevPopAggregator extends BaseDecimalStddevAggregator {
+
+    public DecimalStddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
new file mode 100644
index 0000000..0aa1928
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations for DECIMAL data type.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevSampAggregator extends BaseDecimalStddevAggregator {
+
+    public DecimalStddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
new file mode 100644
index 0000000..bd15969
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * 
+ * Aggregator that sums BigDecimal values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class DecimalSumAggregator extends BaseAggregator {
+    private BigDecimal sum = BigDecimal.ZERO;
+    private byte[] sumBuffer;
+    
+    public DecimalSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    private PDataType getInputDataType() {
+        return PDataType.DECIMAL;
+    }
+    
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        BigDecimal value = (BigDecimal)getDataType().toObject(ptr, getInputDataType(), columnModifier);
+        sum = sum.add(value);
+        if (sumBuffer == null) {
+            sumBuffer = new byte[getDataType().getByteSize()];
+        }
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (sumBuffer == null) {
+            return false;
+        }
+        int len = getDataType().toBytes(sum, sumBuffer, 0);
+        ptr.set(sumBuffer, 0, len);
+        return true;
+    }
+    
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.DECIMAL;
+    }
+    
+    @Override
+    public void reset() {
+        sum = BigDecimal.ZERO;
+        sumBuffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "DECIMAL SUM [sum=" + sum + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.BIG_DECIMAL_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
new file mode 100644
index 0000000..d221e91
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for DISTINCT COUNT aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctCountClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    public DistinctCountClientAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            initBuffer();
+        }
+        long value = this.valueVsCount.size();
+        buffer = PDataType.LONG.toBytes(value);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.LONG.getByteSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
new file mode 100644
index 0000000..0032b6b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class DistinctValueWithCountClientAggregator extends BaseAggregator {
+    protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+    protected byte[] buffer;
+    protected long totalCount = 0L;
+
+    public DistinctValueWithCountClientAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        InputStream is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1);
+        try {
+            if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, DistinctValueWithCountServerAggregator.COMPRESS_MARKER, 0,
+                    1)) {
+                InputStream decompressionStream = DistinctValueWithCountServerAggregator.COMPRESS_ALGO
+                        .createDecompressionStream(is,
+                                DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0);
+                is = decompressionStream;
+            }
+            DataInputStream in = new DataInputStream(is);
+            int mapSize = WritableUtils.readVInt(in);
+            for (int i = 0; i < mapSize; i++) {
+                int keyLen = WritableUtils.readVInt(in);
+                byte[] keyBytes = new byte[keyLen];
+                in.read(keyBytes, 0, keyLen);
+                ImmutableBytesPtr key = new ImmutableBytesPtr(keyBytes);
+                int value = WritableUtils.readVInt(in);
+                Integer curCount = valueVsCount.get(key);
+                if (curCount == null) {
+                    valueVsCount.put(key, value);
+                } else {
+                    valueVsCount.put(key, curCount + value);
+                }
+                totalCount += value;
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe); // Impossible as we're using a ByteArrayInputStream
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+
+    protected abstract int getBufferLength();
+
+    protected void initBuffer() {
+        buffer = new byte[getBufferLength()];
+    }
+
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    @Override
+    public void reset() {
+        valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+        buffer = null;
+        totalCount = 0L;
+        super.reset();
+    }
+    
+    protected Map<Object, Integer> getSortedValueVsCount(final boolean ascending, final PDataType type) {
+        // To sort the valueVsCount
+        Comparator<Object> comparator = new Comparator<Object>() {
+            @Override
+            public int compare(Object o1, Object o2) {
+                if (ascending) { 
+                    return type.compareTo(o1, o2); 
+                }
+                return type.compareTo(o2, o1);
+            }
+        };
+        Map<Object, Integer> sorted = new TreeMap<Object, Integer>(comparator);
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            sorted.put(type.toObject(entry.getKey(), columnModifier), entry.getValue());
+        }
+        return sorted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
new file mode 100644
index 0000000..16b697e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+
+/**
+ * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctValueWithCountServerAggregator extends BaseAggregator {
+    private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
+    public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
+    public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
+    public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
+
+    private int compressThreshold;
+    private byte[] buffer = null;
+    private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+
+    public DistinctValueWithCountServerAggregator(Configuration conf) {
+        super(null);
+        compressThreshold = conf.getInt(QueryServices.DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB,
+                QueryServicesOptions.DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD);
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        ImmutableBytesPtr key = new ImmutableBytesPtr(ptr.get(), ptr.getOffset(), ptr.getLength());
+        Integer count = this.valueVsCount.get(key);
+        if (count == null) {
+            this.valueVsCount.put(key, 1);
+        } else {
+            this.valueVsCount.put(key, ++count);
+        }
+    }
+
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        // This serializes the Map. The format is as follows
+        // Map size(VInt ie. 1 to 5 bytes) +
+        // ( key length [VInt ie. 1 to 5 bytes] + key bytes + value [VInt ie. 1 to 5 bytes] )*
+        int serializationSize = countMapSerializationSize();
+        buffer = new byte[serializationSize];
+        int offset = 1;
+        offset += ByteUtil.vintToBytes(buffer, offset, this.valueVsCount.size());
+        for (Entry<ImmutableBytesPtr, Integer> entry : this.valueVsCount.entrySet()) {
+            ImmutableBytesPtr key = entry.getKey();
+            offset += ByteUtil.vintToBytes(buffer, offset, key.getLength());
+            System.arraycopy(key.get(), key.getOffset(), buffer, offset, key.getLength());
+            offset += key.getLength();
+            offset += ByteUtil.vintToBytes(buffer, offset, entry.getValue().intValue());
+        }
+        if (serializationSize > compressThreshold) {
+            // The size for the map serialization is above the threshold. We will do the Snappy compression here.
+            ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream();
+            try {
+                compressedByteStream.write(COMPRESS_MARKER);
+                OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream,
+                        COMPRESS_ALGO.getCompressor(), 0);
+                compressionStream.write(buffer, 1, buffer.length - 1);
+                compressionStream.flush();
+                ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size());
+                return true;
+            } catch (Exception e) {
+                LOG.error("Exception while Snappy compression of data.", e);
+            }
+        }
+        ptr.set(buffer, 0, offset);
+        return true;
+    }
+
+    // The #bytes required to serialize the count map.
+    // Here let us assume to use 4 bytes for each of the int items. Normally it will consume lesser
+    // bytes as we will use vints.
+    // TODO Do we need to consider 5 as the number of bytes for each of the int field? Else there is
+    // a chance of ArrayIndexOutOfBoundsException when all the int fields are having very large
+    // values. Will that ever occur?
+    private int countMapSerializationSize() {
+        int size = Bytes.SIZEOF_INT;// Write the number of entries in the Map
+        for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+            // Add up the key and key's lengths (Int) and the value
+            size += key.getLength() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+        }
+        return size;
+    }
+
+    // The heap size which will be taken by the count map.
+    private int countMapHeapSize() {
+        int size = 0;
+        if (this.valueVsCount.size() > 0) {
+            for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+                size += SizedUtil.MAP_ENTRY_SIZE + // entry
+                        Bytes.SIZEOF_INT + // key size
+                        key.getLength() + SizedUtil.ARRAY_SIZE; // value size
+            }
+        } else {
+            // Initially when the getSize() is called, we dont have any entries in the map so as to
+            // tell the exact heap need. Let us approximate the #entries
+            SizedUtil.sizeOfMap(DEFAULT_ESTIMATED_DISTINCT_VALUES,
+                    SizedUtil.IMMUTABLE_BYTES_PTR_SIZE, Bytes.SIZEOF_INT);
+        }
+        return size;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    @Override
+    public void reset() {
+        valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+        buffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "DISTINCT VALUE vs COUNT";
+    }
+
+    @Override
+    public int getSize() {
+        // TODO make this size correct.??
+        // This size is being called initially at the begin of the scanner open. At that time we any
+        // way can not tell the exact size of the Map. The Aggregators get size from all Aggregator
+        // and stores in a variable for future use. This size of the Aggregators is being used in
+        // Grouped unordered scan. Do we need some changes there in that calculation?
+        return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
new file mode 100644
index 0000000..883a1ba
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+public class DoubleSumAggregator extends BaseAggregator {
+    
+    private double sum = 0;
+    private byte[] buffer;
+
+    public DoubleSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    protected PDataType getInputDataType() {
+        return PDataType.DOUBLE;
+    }
+    
+    private void initBuffer() {
+        buffer = new byte[getDataType().getByteSize()];
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        double value = getInputDataType().getCodec().decodeDouble(ptr, columnModifier);
+        sum += value;
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            if (isNullable()) {
+                return false;
+            }
+            initBuffer();
+        }
+        ptr.set(buffer);
+        getDataType().getCodec().encodeDouble(sum, ptr);
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.DOUBLE;
+    }
+    
+    @Override
+    public String toString() {
+        return "SUM [sum=" + sum + "]";
+    }
+    
+    @Override
+    public void reset() {
+        sum = 0;
+        buffer = null;
+        super.reset();
+    }
+    
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
new file mode 100644
index 0000000..e96a993
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums integer values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class IntSumAggregator extends NumberSumAggregator {
+    
+    public IntSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.INTEGER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
new file mode 100644
index 0000000..bfdadc9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums long values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class LongSumAggregator extends NumberSumAggregator {
+    
+    public LongSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.LONG;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
new file mode 100644
index 0000000..890e14a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+/**
+ * Aggregator that finds the max of values. Inverse of {@link MinAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MaxAggregator extends MinAggregator {
+    
+    public MaxAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+        return !super.keepFirst(ibw1, ibw2);
+    }
+    
+    @Override
+    public String toString() {
+        return "MAX [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
new file mode 100644
index 0000000..8954de2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * Aggregator that finds the min of values. Inverse of {@link MaxAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MinAggregator extends BaseAggregator {
+    /** Used to store the accumulate the results of the MIN function */
+    protected final ImmutableBytesWritable value = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+    
+    public MinAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public void reset() {
+        value.set(ByteUtil.EMPTY_BYTE_ARRAY);
+        super.reset();
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + /*value*/ SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE;
+    }
+
+    /**
+     * Compares two bytes writables, and returns true if the first one should be
+     * kept, and false otherwise. For the MIN function, this method will return
+     * true if the first bytes writable is less than the second.
+     * 
+     * @param ibw1 the first bytes writable
+     * @param ibw2 the second bytes writable
+     * @return true if the first bytes writable should be kept
+     */
+    protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+        return 0 >= getDataType().compareTo(ibw1, columnModifier, ibw2, columnModifier, getDataType());
+    }
+
+    private boolean isNull() {
+        return value.get() == ByteUtil.EMPTY_BYTE_ARRAY;
+    }
+    
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (isNull()) {
+            value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+        } else {
+            if (!keepFirst(value, ptr)) {
+                // replace the value with the new value
+                value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+            }
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return "MIN [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (isNull()) {
+            return false;
+        }
+        ptr.set(value.get(), value.getOffset(), value.getLength());
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
new file mode 100644
index 0000000..fcfef58
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * 
+ * Aggregator that sums integral number values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class NumberSumAggregator extends BaseAggregator {
+    private long sum = 0;
+    private byte[] buffer;
+    
+    public NumberSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    abstract protected PDataType getInputDataType();
+    
+    private int getBufferLength() {
+        return getDataType().getByteSize();
+    }
+    
+    private void initBuffer() {
+        buffer = new byte[getBufferLength()];
+    }
+        
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        // Get either IntNative or LongNative depending on input type
+        long value = getInputDataType().getCodec().decodeLong(ptr, columnModifier);
+        sum += value;
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            if (isNullable()) {
+                return false;
+            }
+            initBuffer();
+        }
+        ptr.set(buffer);
+        getDataType().getCodec().encodeLong(sum, ptr);
+        return true;
+    }
+    
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.LONG;
+    }
+    
+    @Override
+    public void reset() {
+        sum = 0;
+        buffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "SUM [sum=" + sum + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getBufferLength();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
new file mode 100644
index 0000000..42ca267
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENT_RANK aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentRankClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    private final List<Expression> exps;
+    private BigDecimal cachedResult = null;
+
+    public PercentRankClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.exps = exps;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+            // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+            // be ASC/DESC
+            LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+            boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+            // Third expression will be LiteralExpression
+            LiteralExpression valueExp = (LiteralExpression)exps.get(2);
+            Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+            long distinctCountsSum = 0;
+            Object value = valueExp.getValue();
+            for (Entry<Object, Integer> entry : sorted.entrySet()) {
+                Object colValue = entry.getKey();
+                int compareResult = columnExp.getDataType().compareTo(colValue, value, valueExp.getDataType());
+                boolean done = isAscending ? compareResult > 0 : compareResult <= 0;
+                if (done) break;
+                distinctCountsSum += entry.getValue();
+            }
+
+            float result = (float)distinctCountsSum / totalCount;
+            this.cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
new file mode 100644
index 0000000..095842a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENTILE_CONT aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentileClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    private final List<Expression> exps;
+    private BigDecimal cachedResult = null;
+
+    public PercentileClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.exps = exps;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+            // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+            // be ASC/DESC
+            LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+            boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+            // Third expression will be LiteralExpression
+            LiteralExpression percentileExp = (LiteralExpression)exps.get(2);
+            float p = ((Number)percentileExp.getValue()).floatValue();
+            Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+            float i = (p * this.totalCount) + 0.5F;
+            long k = (long)i;
+            float f = i - k;
+            Object o1 = null;
+            Object o2 = null;
+            long distinctCountsSum = 0;
+            for (Entry<Object, Integer> entry : sorted.entrySet()) {
+                if (o1 != null) {
+                    o2 = entry.getKey();
+                    break;
+                }
+                distinctCountsSum += entry.getValue();
+                if (distinctCountsSum == k) {
+                    o1 = entry.getKey();
+                } else if (distinctCountsSum > k) {
+                    o1 = o2 = entry.getKey();
+                    break;
+                }
+            }
+
+            double result = 0.0;
+            Number n1 = (Number)o1;
+            if (o2 == null || o1 == o2) {
+                result = n1.doubleValue();
+            } else {
+                Number n2 = (Number)o2;
+                result = (n1.doubleValue() * (1.0F - f)) + (n2.doubleValue() * f);
+            }
+            this.cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
new file mode 100644
index 0000000..206ed1d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Built-in function for PERCENTILE_DISC(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate function
+ *
+ * @author ramkrishna
+ * @since 1.2.1
+ */
+public class PercentileDiscClientAggregator extends
+		DistinctValueWithCountClientAggregator {
+
+	private final List<Expression> exps;
+	private Object cachedResult = null;
+	ColumnExpression columnExp = null;
+
+	public PercentileDiscClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+	    super(columnModifier);
+		this.exps = exps;
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+		// Reset buffer so that it gets initialized with the current datatype of the column
+		buffer = null;
+		if (cachedResult == null) {
+			columnExp = (ColumnExpression)exps.get(0);
+			// Second exp will be a LiteralExpression of Boolean type indicating
+			// whether the ordering to be ASC/DESC
+			LiteralExpression isAscendingExpression = (LiteralExpression) exps
+					.get(1);
+			boolean isAscending = (Boolean) isAscendingExpression.getValue();
+
+			// Third expression will be LiteralExpression
+			LiteralExpression percentileExp = (LiteralExpression) exps.get(2);
+			float p = ((Number) percentileExp.getValue()).floatValue();
+			Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+			int currValue = 0;
+			Object result = null;
+			// Here the Percentile_disc returns the cum_dist() that is greater or equal to the
+			// Percentile (p) specified in the query.  So the result set will be of that of the
+			// datatype of the column being selected
+			for (Entry<Object, Integer> entry : sorted.entrySet()) {
+				result = entry.getKey();
+				Integer value = entry.getValue();
+				currValue += value;
+				float cum_dist = (float) currValue / (float) totalCount;
+				if (cum_dist >= p) {
+					break;
+				}
+			}
+			this.cachedResult = result;
+		}
+		if (buffer == null) {
+			// Initialize based on the datatype
+			// columnExp cannot be null
+			buffer = new byte[columnExp.getDataType().getByteSize()];
+		}
+		// Copy the result to the buffer.
+		System.arraycopy(columnExp.getDataType().toBytes(this.cachedResult), 0, buffer, 0, buffer.length);
+		ptr.set(buffer);
+		return true;
+	}
+
+	@Override
+	public void reset() {
+		super.reset();
+		this.cachedResult = null;
+	}
+
+	@Override
+	protected int getBufferLength() {
+		// Will be used in the aggregate() call
+		return PDataType.DECIMAL.getByteSize();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
new file mode 100644
index 0000000..6457793
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Aggregators that execute on the server-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerAggregators extends Aggregators {
+    public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0);
+    private final Expression[] expressions;
+    
+    private ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
+        super(functions, aggregators, minNullableIndex);
+        if (aggregators.length != expressions.length) {
+            throw new IllegalArgumentException("Number of aggregators (" + aggregators.length 
+                    + ") must match the number of expressions (" + Arrays.toString(expressions) + ")");
+        }
+        this.expressions = expressions;
+    }
+    
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        for (int i = 0; i < expressions.length; i++) {
+            if (expressions[i].evaluate(result, ptr)) {
+                aggregators[i].aggregate(result, ptr);
+            }
+        }
+    }
+    
+    /**
+     * Serialize an Aggregator into a byte array
+     * @param aggFuncs list of aggregator to serialize
+     * @return serialized byte array respresentation of aggregator
+     */
+    public static byte[] serialize(List<SingleAggregateFunction> aggFuncs, int minNullableIndex) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, minNullableIndex);
+            WritableUtils.writeVInt(output, aggFuncs.size());
+            for (int i = 0; i < aggFuncs.size(); i++) {
+                SingleAggregateFunction aggFunc = aggFuncs.get(i);
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(aggFunc).ordinal());
+                aggFunc.write(output);
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public Aggregator[] newAggregators() {
+        return newAggregators(null);
+    }
+
+    public Aggregator[] newAggregators(Configuration conf) {
+        Aggregator[] aggregators = new Aggregator[functions.length];
+        for (int i = 0; i < functions.length; i++) {
+            aggregators[i] = functions[i].newServerAggregator(conf);
+        }
+        return aggregators;
+    }
+
+    /**
+     * Deserialize aggregators from the serialized byte array representation
+     * @param b byte array representation of a list of Aggregators
+     * @param conf Server side configuration used by HBase
+     * @return newly instantiated Aggregators instance
+     */
+    public static ServerAggregators deserialize(byte[] b, Configuration conf) {
+        if (b == null) {
+            return ServerAggregators.EMPTY_AGGREGATORS;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int minNullableIndex = WritableUtils.readVInt(input);
+            int len = WritableUtils.readVInt(input);
+            Aggregator[] aggregators = new Aggregator[len];
+            Expression[] expressions = new Expression[len];
+            SingleAggregateFunction[] functions = new SingleAggregateFunction[len];
+            for (int i = 0; i < aggregators.length; i++) {
+                SingleAggregateFunction aggFunc = (SingleAggregateFunction)ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                aggFunc.readFields(input, conf);
+                functions[i] = aggFunc;
+                aggregators[i] = aggFunc.getAggregator();
+                expressions[i] = aggFunc.getAggregatorExpression();
+            }
+            return new ServerAggregators(functions, aggregators,expressions, minNullableIndex);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
new file mode 100644
index 0000000..30276e1
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevPopAggregator extends BaseStddevAggregator {
+
+    public StddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
new file mode 100644
index 0000000..49f52d2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevSampAggregator extends BaseStddevAggregator {
+
+    public StddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
new file mode 100644
index 0000000..c8befca
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums unsigned integer values
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedIntSumAggregator extends NumberSumAggregator {
+    
+    public UnsignedIntSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.UNSIGNED_INT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
new file mode 100644
index 0000000..b91a934
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums unsigned long values
+ * TODO: create these classes dynamically based on the type passed through
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedLongSumAggregator extends NumberSumAggregator {
+    
+    public UnsignedLongSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.UNSIGNED_LONG;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java b/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
new file mode 100644
index 0000000..8c9ef5b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+
+
+
+
+/**
+ * 
+ * Compiled representation of a built-in aggregate function 
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class AggregateFunction extends FunctionExpression {
+
+    public AggregateFunction() {
+    }
+
+    public AggregateFunction(List<Expression> children) {
+        super(children);
+    }
+}