You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:46 UTC

[30/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
new file mode 100644
index 0000000..ef0a9c2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctValueWithCountServerAggregator extends BaseAggregator {
+    private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
+    public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
+    public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
+    public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
+
+    private int compressThreshold;
+    private byte[] buffer = null;
+    private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+
+    public DistinctValueWithCountServerAggregator(Configuration conf) {
+        super(null);
+        compressThreshold = conf.getInt(QueryServices.DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB,
+                QueryServicesOptions.DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD);
+    }
+
+    public DistinctValueWithCountServerAggregator(Configuration conf, DistinctValueWithCountClientAggregator clientAgg) {
+        this(conf);
+        valueVsCount = clientAgg.valueVsCount;
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        ImmutableBytesPtr key = new ImmutableBytesPtr(ptr.get(), ptr.getOffset(), ptr.getLength());
+        Integer count = this.valueVsCount.get(key);
+        if (count == null) {
+            this.valueVsCount.put(key, 1);
+        } else {
+            this.valueVsCount.put(key, ++count);
+        }
+    }
+
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        // This serializes the Map. The format is as follows
+        // Map size(VInt ie. 1 to 5 bytes) +
+        // ( key length [VInt ie. 1 to 5 bytes] + key bytes + value [VInt ie. 1 to 5 bytes] )*
+        int serializationSize = countMapSerializationSize();
+        buffer = new byte[serializationSize];
+        int offset = 1;
+        offset += ByteUtil.vintToBytes(buffer, offset, this.valueVsCount.size());
+        for (Entry<ImmutableBytesPtr, Integer> entry : this.valueVsCount.entrySet()) {
+            ImmutableBytesPtr key = entry.getKey();
+            offset += ByteUtil.vintToBytes(buffer, offset, key.getLength());
+            System.arraycopy(key.get(), key.getOffset(), buffer, offset, key.getLength());
+            offset += key.getLength();
+            offset += ByteUtil.vintToBytes(buffer, offset, entry.getValue().intValue());
+        }
+        if (serializationSize > compressThreshold) {
+            // The size for the map serialization is above the threshold. We will do the Snappy compression here.
+            ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream();
+            try {
+                compressedByteStream.write(COMPRESS_MARKER);
+                OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream,
+                        COMPRESS_ALGO.getCompressor(), 0);
+                compressionStream.write(buffer, 1, buffer.length - 1);
+                compressionStream.flush();
+                ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size());
+                return true;
+            } catch (Exception e) {
+                LOG.error("Exception while Snappy compression of data.", e);
+            }
+        }
+        ptr.set(buffer, 0, offset);
+        return true;
+    }
+
+    // The #bytes required to serialize the count map.
+    // Here let us assume to use 4 bytes for each of the int items. Normally it will consume lesser
+    // bytes as we will use vints.
+    // TODO Do we need to consider 5 as the number of bytes for each of the int field? Else there is
+    // a chance of ArrayIndexOutOfBoundsException when all the int fields are having very large
+    // values. Will that ever occur?
+    private int countMapSerializationSize() {
+        int size = Bytes.SIZEOF_INT;// Write the number of entries in the Map
+        for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+            // Add up the key and key's lengths (Int) and the value
+            size += key.getLength() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+        }
+        return size;
+    }
+
+    // The heap size which will be taken by the count map.
+    private int countMapHeapSize() {
+        int size = 0;
+        if (this.valueVsCount.size() > 0) {
+            for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+                size += SizedUtil.MAP_ENTRY_SIZE + // entry
+                        Bytes.SIZEOF_INT + // key size
+                        key.getLength() + SizedUtil.ARRAY_SIZE; // value size
+            }
+        } else {
+            // Initially when the getSize() is called, we dont have any entries in the map so as to
+            // tell the exact heap need. Let us approximate the #entries
+            SizedUtil.sizeOfMap(DEFAULT_ESTIMATED_DISTINCT_VALUES,
+                    SizedUtil.IMMUTABLE_BYTES_PTR_SIZE, Bytes.SIZEOF_INT);
+        }
+        return size;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    @Override
+    public void reset() {
+        valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+        buffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "DISTINCT VALUE vs COUNT";
+    }
+
+    @Override
+    public int getSize() {
+        // TODO make this size correct.??
+        // This size is being called initially at the begin of the scanner open. At that time we any
+        // way can not tell the exact size of the Map. The Aggregators get size from all Aggregator
+        // and stores in a variable for future use. This size of the Aggregators is being used in
+        // Grouped unordered scan. Do we need some changes there in that calculation?
+        return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
new file mode 100644
index 0000000..fa1dda7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+public class DoubleSumAggregator extends BaseAggregator {
+    
+    private double sum = 0;
+    private byte[] buffer;
+
+    public DoubleSumAggregator(ColumnModifier columnModifier, ImmutableBytesWritable ptr) {
+        super(columnModifier);
+        if (ptr != null) {
+            initBuffer();
+            sum = PDataType.DOUBLE.getCodec().decodeDouble(ptr, columnModifier);
+        }
+    }
+    
+    protected PDataType getInputDataType() {
+        return PDataType.DOUBLE;
+    }
+    
+    private void initBuffer() {
+        buffer = new byte[getDataType().getByteSize()];
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        double value = getInputDataType().getCodec().decodeDouble(ptr, columnModifier);
+        sum += value;
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            if (isNullable()) {
+                return false;
+            }
+            initBuffer();
+        }
+        ptr.set(buffer);
+        getDataType().getCodec().encodeDouble(sum, ptr);
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.DOUBLE;
+    }
+    
+    @Override
+    public String toString() {
+        return "SUM [sum=" + sum + "]";
+    }
+    
+    @Override
+    public void reset() {
+        sum = 0;
+        buffer = null;
+        super.reset();
+    }
+    
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
new file mode 100644
index 0000000..e96a993
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums integer values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class IntSumAggregator extends NumberSumAggregator {
+    
+    public IntSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.INTEGER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
new file mode 100644
index 0000000..bfdadc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums long values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class LongSumAggregator extends NumberSumAggregator {
+    
+    public LongSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.LONG;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
new file mode 100644
index 0000000..890e14a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+/**
+ * Aggregator that finds the max of values. Inverse of {@link MinAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MaxAggregator extends MinAggregator {
+    
+    public MaxAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+        return !super.keepFirst(ibw1, ibw2);
+    }
+    
+    @Override
+    public String toString() {
+        return "MAX [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
new file mode 100644
index 0000000..8954de2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * Aggregator that finds the min of values. Inverse of {@link MaxAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MinAggregator extends BaseAggregator {
+    /** Used to store the accumulate the results of the MIN function */
+    protected final ImmutableBytesWritable value = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+    
+    public MinAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public void reset() {
+        value.set(ByteUtil.EMPTY_BYTE_ARRAY);
+        super.reset();
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + /*value*/ SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE;
+    }
+
+    /**
+     * Compares two bytes writables, and returns true if the first one should be
+     * kept, and false otherwise. For the MIN function, this method will return
+     * true if the first bytes writable is less than the second.
+     * 
+     * @param ibw1 the first bytes writable
+     * @param ibw2 the second bytes writable
+     * @return true if the first bytes writable should be kept
+     */
+    protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+        return 0 >= getDataType().compareTo(ibw1, columnModifier, ibw2, columnModifier, getDataType());
+    }
+
+    private boolean isNull() {
+        return value.get() == ByteUtil.EMPTY_BYTE_ARRAY;
+    }
+    
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (isNull()) {
+            value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+        } else {
+            if (!keepFirst(value, ptr)) {
+                // replace the value with the new value
+                value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+            }
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return "MIN [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (isNull()) {
+            return false;
+        }
+        ptr.set(value.get(), value.getOffset(), value.getLength());
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
new file mode 100644
index 0000000..a7cfcd1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * 
+ * Aggregator that sums integral number values
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class NumberSumAggregator extends BaseAggregator {
+    private long sum = 0;
+    private byte[] buffer;
+
+    public NumberSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    public NumberSumAggregator(ColumnModifier columnModifier,
+            ImmutableBytesWritable ptr) {
+        this(columnModifier);
+        if (ptr != null) {
+            initBuffer();
+            sum = PDataType.LONG.getCodec().decodeLong(ptr, columnModifier);
+        }
+    }
+
+    public long getSum() {
+        return sum;
+    }
+
+    abstract protected PDataType getInputDataType();
+
+    private int getBufferLength() {
+        return getDataType().getByteSize();
+    }
+
+    private void initBuffer() {
+        buffer = new byte[getBufferLength()];
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        // Get either IntNative or LongNative depending on input type
+        long value = getInputDataType().getCodec().decodeLong(ptr,
+                columnModifier);
+        sum += value;
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            if (isNullable()) {
+                return false;
+            }
+            initBuffer();
+        }
+        ptr.set(buffer);
+        getDataType().getCodec().encodeLong(sum, ptr);
+        return true;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.LONG;
+    }
+
+    @Override
+    public void reset() {
+        sum = 0;
+        buffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "SUM [sum=" + sum + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE
+                + getBufferLength();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
new file mode 100644
index 0000000..42ca267
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENT_RANK aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentRankClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    private final List<Expression> exps;
+    private BigDecimal cachedResult = null;
+
+    public PercentRankClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.exps = exps;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+            // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+            // be ASC/DESC
+            LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+            boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+            // Third expression will be LiteralExpression
+            LiteralExpression valueExp = (LiteralExpression)exps.get(2);
+            Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+            long distinctCountsSum = 0;
+            Object value = valueExp.getValue();
+            for (Entry<Object, Integer> entry : sorted.entrySet()) {
+                Object colValue = entry.getKey();
+                int compareResult = columnExp.getDataType().compareTo(colValue, value, valueExp.getDataType());
+                boolean done = isAscending ? compareResult > 0 : compareResult <= 0;
+                if (done) break;
+                distinctCountsSum += entry.getValue();
+            }
+
+            float result = (float)distinctCountsSum / totalCount;
+            this.cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
new file mode 100644
index 0000000..095842a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENTILE_CONT aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentileClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    private final List<Expression> exps;
+    private BigDecimal cachedResult = null;
+
+    public PercentileClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.exps = exps;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+            // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+            // be ASC/DESC
+            LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+            boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+            // Third expression will be LiteralExpression
+            LiteralExpression percentileExp = (LiteralExpression)exps.get(2);
+            float p = ((Number)percentileExp.getValue()).floatValue();
+            Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+            float i = (p * this.totalCount) + 0.5F;
+            long k = (long)i;
+            float f = i - k;
+            Object o1 = null;
+            Object o2 = null;
+            long distinctCountsSum = 0;
+            for (Entry<Object, Integer> entry : sorted.entrySet()) {
+                if (o1 != null) {
+                    o2 = entry.getKey();
+                    break;
+                }
+                distinctCountsSum += entry.getValue();
+                if (distinctCountsSum == k) {
+                    o1 = entry.getKey();
+                } else if (distinctCountsSum > k) {
+                    o1 = o2 = entry.getKey();
+                    break;
+                }
+            }
+
+            double result = 0.0;
+            Number n1 = (Number)o1;
+            if (o2 == null || o1 == o2) {
+                result = n1.doubleValue();
+            } else {
+                Number n2 = (Number)o2;
+                result = (n1.doubleValue() * (1.0F - f)) + (n2.doubleValue() * f);
+            }
+            this.cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
new file mode 100644
index 0000000..206ed1d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Built-in function for PERCENTILE_DISC(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate function
+ *
+ * @author ramkrishna
+ * @since 1.2.1
+ */
+public class PercentileDiscClientAggregator extends
+		DistinctValueWithCountClientAggregator {
+
+	private final List<Expression> exps;
+	private Object cachedResult = null;
+	ColumnExpression columnExp = null;
+
+	public PercentileDiscClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+	    super(columnModifier);
+		this.exps = exps;
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+		// Reset buffer so that it gets initialized with the current datatype of the column
+		buffer = null;
+		if (cachedResult == null) {
+			columnExp = (ColumnExpression)exps.get(0);
+			// Second exp will be a LiteralExpression of Boolean type indicating
+			// whether the ordering to be ASC/DESC
+			LiteralExpression isAscendingExpression = (LiteralExpression) exps
+					.get(1);
+			boolean isAscending = (Boolean) isAscendingExpression.getValue();
+
+			// Third expression will be LiteralExpression
+			LiteralExpression percentileExp = (LiteralExpression) exps.get(2);
+			float p = ((Number) percentileExp.getValue()).floatValue();
+			Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+			int currValue = 0;
+			Object result = null;
+			// Here the Percentile_disc returns the cum_dist() that is greater or equal to the
+			// Percentile (p) specified in the query.  So the result set will be of that of the
+			// datatype of the column being selected
+			for (Entry<Object, Integer> entry : sorted.entrySet()) {
+				result = entry.getKey();
+				Integer value = entry.getValue();
+				currValue += value;
+				float cum_dist = (float) currValue / (float) totalCount;
+				if (cum_dist >= p) {
+					break;
+				}
+			}
+			this.cachedResult = result;
+		}
+		if (buffer == null) {
+			// Initialize based on the datatype
+			// columnExp cannot be null
+			buffer = new byte[columnExp.getDataType().getByteSize()];
+		}
+		// Copy the result to the buffer.
+		System.arraycopy(columnExp.getDataType().toBytes(this.cachedResult), 0, buffer, 0, buffer.length);
+		ptr.set(buffer);
+		return true;
+	}
+
+	@Override
+	public void reset() {
+		super.reset();
+		this.cachedResult = null;
+	}
+
+	@Override
+	protected int getBufferLength() {
+		// Will be used in the aggregate() call
+		return PDataType.DECIMAL.getByteSize();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
new file mode 100644
index 0000000..6457793
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Aggregators that execute on the server-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerAggregators extends Aggregators {
+    public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0);
+    private final Expression[] expressions;
+    
+    private ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
+        super(functions, aggregators, minNullableIndex);
+        if (aggregators.length != expressions.length) {
+            throw new IllegalArgumentException("Number of aggregators (" + aggregators.length 
+                    + ") must match the number of expressions (" + Arrays.toString(expressions) + ")");
+        }
+        this.expressions = expressions;
+    }
+    
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        for (int i = 0; i < expressions.length; i++) {
+            if (expressions[i].evaluate(result, ptr)) {
+                aggregators[i].aggregate(result, ptr);
+            }
+        }
+    }
+    
+    /**
+     * Serialize an Aggregator into a byte array
+     * @param aggFuncs list of aggregator to serialize
+     * @return serialized byte array respresentation of aggregator
+     */
+    public static byte[] serialize(List<SingleAggregateFunction> aggFuncs, int minNullableIndex) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, minNullableIndex);
+            WritableUtils.writeVInt(output, aggFuncs.size());
+            for (int i = 0; i < aggFuncs.size(); i++) {
+                SingleAggregateFunction aggFunc = aggFuncs.get(i);
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(aggFunc).ordinal());
+                aggFunc.write(output);
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public Aggregator[] newAggregators() {
+        return newAggregators(null);
+    }
+
+    public Aggregator[] newAggregators(Configuration conf) {
+        Aggregator[] aggregators = new Aggregator[functions.length];
+        for (int i = 0; i < functions.length; i++) {
+            aggregators[i] = functions[i].newServerAggregator(conf);
+        }
+        return aggregators;
+    }
+
+    /**
+     * Deserialize aggregators from the serialized byte array representation
+     * @param b byte array representation of a list of Aggregators
+     * @param conf Server side configuration used by HBase
+     * @return newly instantiated Aggregators instance
+     */
+    public static ServerAggregators deserialize(byte[] b, Configuration conf) {
+        if (b == null) {
+            return ServerAggregators.EMPTY_AGGREGATORS;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int minNullableIndex = WritableUtils.readVInt(input);
+            int len = WritableUtils.readVInt(input);
+            Aggregator[] aggregators = new Aggregator[len];
+            Expression[] expressions = new Expression[len];
+            SingleAggregateFunction[] functions = new SingleAggregateFunction[len];
+            for (int i = 0; i < aggregators.length; i++) {
+                SingleAggregateFunction aggFunc = (SingleAggregateFunction)ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                aggFunc.readFields(input, conf);
+                functions[i] = aggFunc;
+                aggregators[i] = aggFunc.getAggregator();
+                expressions[i] = aggFunc.getAggregatorExpression();
+            }
+            return new ServerAggregators(functions, aggregators,expressions, minNullableIndex);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
new file mode 100644
index 0000000..30276e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevPopAggregator extends BaseStddevAggregator {
+
+    public StddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
new file mode 100644
index 0000000..49f52d2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevSampAggregator extends BaseStddevAggregator {
+
+    public StddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
new file mode 100644
index 0000000..c8befca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums unsigned integer values
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedIntSumAggregator extends NumberSumAggregator {
+    
+    public UnsignedIntSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.UNSIGNED_INT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
new file mode 100644
index 0000000..b91a934
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Aggregator that sums unsigned long values
+ * TODO: create these classes dynamically based on the type passed through
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedLongSumAggregator extends NumberSumAggregator {
+    
+    public UnsignedLongSumAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+    
+    @Override
+    protected PDataType getInputDataType() {
+        return PDataType.UNSIGNED_LONG;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
new file mode 100644
index 0000000..72809f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+
+
+
+
+/**
+ * 
+ * Compiled representation of a built-in aggregate function 
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class AggregateFunction extends FunctionExpression {
+
+    public AggregateFunction() {
+    }
+
+    public AggregateFunction(List<Expression> children) {
+        super(children);
+    }
+    
+    @Override
+    public boolean isStateless() {
+        return false;
+    }
+
+    @Override
+    public boolean isDeterministic() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
new file mode 100644
index 0000000..a4185e6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.ParseException;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+@BuiltInFunction(name = ArrayIndexFunction.NAME, args = {
+		@Argument(allowedTypes = { PDataType.BINARY_ARRAY,
+				PDataType.VARBINARY_ARRAY }),
+		@Argument(allowedTypes = { PDataType.INTEGER }) })
+public class ArrayIndexFunction extends ScalarFunction {
+
+	public static final String NAME = "ARRAY_ELEM";
+
+	public ArrayIndexFunction() {
+	}
+
+	public ArrayIndexFunction(List<Expression> children) {
+		super(children);
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+		Expression indexExpr = children.get(1);
+		if (!indexExpr.evaluate(tuple, ptr)) {
+		  return false;
+		} else if (ptr.getLength() == 0) {
+		  return true;
+		}
+		// Use Codec to prevent Integer object allocation
+		int index = PDataType.INTEGER.getCodec().decodeInt(ptr, indexExpr.getColumnModifier());
+		if(index < 0) {
+			throw new ParseException("Index cannot be negative :" + index);
+		}
+		Expression arrayExpr = children.get(0);
+		if (!arrayExpr.evaluate(tuple, ptr)) {
+		  return false;
+		} else if (ptr.getLength() == 0) {
+		  return true;
+		}
+
+		// Given a ptr to the entire array, set ptr to point to a particular element within that array
+		// given the type of an array element (see comments in PDataTypeForArray)
+		PArrayDataType.positionAtArrayElement(ptr, index, getDataType());
+		return true;
+		
+	}
+
+	@Override
+	public PDataType getDataType() {
+		return PDataType.fromTypeId(children.get(0).getDataType().getSqlType()
+				- Types.ARRAY);
+	}
+
+	@Override
+	public String getName() {
+		return NAME;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
new file mode 100644
index 0000000..9a94129
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+@BuiltInFunction(name = ArrayLengthFunction.NAME, args = { @Argument(allowedTypes = {
+		PDataType.BINARY_ARRAY, PDataType.VARBINARY_ARRAY }) })
+public class ArrayLengthFunction extends ScalarFunction {
+	public static final String NAME = "ARRAY_LENGTH";
+
+	public ArrayLengthFunction() {
+	}
+
+	public ArrayLengthFunction(List<Expression> children) {
+		super(children);
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+		Expression arrayExpr = children.get(0);
+		if (!arrayExpr.evaluate(tuple, ptr)) {
+			return false;
+		} else if (ptr.getLength() == 0) {
+			return true;
+		}
+		PDataType baseType = PDataType.fromTypeId(children.get(0).getDataType()
+				.getSqlType()
+				- Types.ARRAY);
+		int length = PArrayDataType.getArrayLength(ptr, baseType);
+		byte[] lengthBuf = new byte[PDataType.INTEGER.getByteSize()];
+		PDataType.INTEGER.getCodec().encodeInt(length, lengthBuf, 0);
+		ptr.set(lengthBuf);
+		return true;
+	}
+
+	@Override
+	public PDataType getDataType() {
+		// Array length will return an Integer
+		return PDataType.INTEGER;
+	}
+
+	@Override
+	public String getName() {
+		return NAME;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
new file mode 100644
index 0000000..62bb565
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.parse.*;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+@BuiltInFunction(name=AvgAggregateFunction.NAME, nodeClass=AvgAggregateParseNode.class, args= {@Argument(allowedTypes={PDataType.DECIMAL})} )
+public class AvgAggregateFunction extends CompositeAggregateFunction {
+    public static final String NAME = "AVG";
+    private final CountAggregateFunction countFunc;
+    private final SumAggregateFunction sumFunc;
+    private Integer scale;
+
+    // TODO: remove when not required at built-in func register time
+    public AvgAggregateFunction(List<Expression> children) {
+        super(children);
+        this.countFunc = null;
+        this.sumFunc = null;
+        setScale(children);
+    }
+
+    public AvgAggregateFunction(List<Expression> children, CountAggregateFunction countFunc, SumAggregateFunction sumFunc) {
+        super(children);
+        this.countFunc = countFunc;
+        this.sumFunc = sumFunc;
+        setScale(children);
+    }
+
+    private void setScale(List<Expression> children) {
+        scale = PDataType.MIN_DECIMAL_AVG_SCALE; // At least 4;
+        for (Expression child: children) {
+            if (child.getScale() != null) {
+                scale = Math.max(scale, child.getScale());
+            }
+        }
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.DECIMAL;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (!countFunc.evaluate(tuple, ptr)) {
+            return false;
+        }
+        long count = countFunc.getDataType().getCodec().decodeLong(ptr, null);
+        if (count == 0) {
+            return false;
+        }
+        
+        // Normal case where a column reference was used as the argument to AVG
+        if (!countFunc.isConstantExpression()) {
+            sumFunc.evaluate(tuple, ptr);
+            BigDecimal sum = (BigDecimal)PDataType.DECIMAL.toObject(ptr, sumFunc.getDataType());
+            // For the final column projection, we divide the sum by the count, both coerced to BigDecimal.
+            // TODO: base the precision on column metadata instead of constant
+            BigDecimal avg = sum.divide(BigDecimal.valueOf(count), PDataType.DEFAULT_MATH_CONTEXT);
+            avg = avg.setScale(scale, BigDecimal.ROUND_DOWN);
+            ptr.set(PDataType.DECIMAL.toBytes(avg));
+            return true;
+        }
+        BigDecimal value = (BigDecimal) ((LiteralExpression)countFunc.getChildren().get(0)).getValue();
+        value = value.setScale(scale, BigDecimal.ROUND_DOWN);
+        ptr.set(PDataType.DECIMAL.toBytes(value));
+        return true;
+    }
+
+    @Override
+    public boolean isNullable() {
+        return sumFunc != null && sumFunc.isNullable();
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public Integer getScale() {
+        return scale;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
new file mode 100644
index 0000000..d0951cf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.Expression;
+
+/**
+ * 
+ * Class encapsulating ceil operation on {@link org.apache.phoenix.schema.PDataType#DATE}.
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilDateExpression extends RoundDateExpression {
+    
+    public CeilDateExpression() {}
+    
+    /**
+     * @param timeUnit - unit of time to round up to.
+     * Creates a {@link CeilDateExpression} with default multiplier of 1.
+     */
+    public static Expression create(Expression expr, TimeUnit timeUnit) throws SQLException {
+        return create(expr, timeUnit, 1);
+    }
+    
+    /**
+     * @param timeUnit - unit of time to round up to
+     * @param multiplier - determines the roll up window size.
+     * Create a {@link CeilDateExpression}. 
+     */
+    public static Expression create(Expression expr, TimeUnit timeUnit, int multiplier) throws SQLException {
+        Expression timeUnitExpr = getTimeUnitExpr(timeUnit);
+        Expression defaultMultiplierExpr = getMultiplierExpr(multiplier);
+        List<Expression> expressions = Lists.newArrayList(expr, timeUnitExpr, defaultMultiplierExpr);
+        return CeilDateExpression.create(expressions);
+    }
+    
+    public static Expression create(List<Expression> children) throws SQLException {
+        return new CeilDateExpression(children);
+    }
+    
+    CeilDateExpression(List<Expression> children) {
+        super(children);
+    }
+    
+    @Override
+    protected long getRoundUpAmount() {
+        return divBy - 1;
+    }
+    
+    @Override
+    public String getName() {
+        return CeilFunction.NAME;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
new file mode 100644
index 0000000..dd2c22a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.math.RoundingMode;
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * 
+ * Class encapsulating the CEIL operation on a {@link org.apache.phoenix.schema.PDataType#DECIMAL}
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilDecimalExpression extends RoundDecimalExpression {
+    
+    public CeilDecimalExpression() {}
+    
+    public CeilDecimalExpression(List<Expression> children) {
+        super(children);
+    }
+    
+  /**
+   * Creates a {@link CeilDecimalExpression} with rounding scale given by @param scale.
+   *
+   */
+   public static Expression create(Expression expr, int scale) throws SQLException {
+       if (expr.getDataType().isCoercibleTo(PDataType.LONG)) {
+           return expr;
+       }
+       Expression scaleExpr = LiteralExpression.newConstant(scale, PDataType.INTEGER, true);
+       List<Expression> expressions = Lists.newArrayList(expr, scaleExpr);
+       return new CeilDecimalExpression(expressions);
+   }
+   
+   /**
+    * Creates a {@link CeilDecimalExpression} with a default scale of 0 used for rounding. 
+    *
+    */
+   public static Expression create(Expression expr) throws SQLException {
+       return create(expr, 0);
+   }
+    
+    @Override
+    protected RoundingMode getRoundingMode() {
+        return RoundingMode.CEILING;
+    }
+    
+    @Override
+    public String getName() {
+        return CeilFunction.NAME;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
new file mode 100644
index 0000000..60ad329
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.CeilParseNode;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+/**
+ * 
+ * Base class for built-in CEIL function.
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+@BuiltInFunction(name = CeilFunction.NAME,
+                 nodeClass = CeilParseNode.class,
+                 args = {
+                        @Argument(allowedTypes={PDataType.TIMESTAMP, PDataType.DECIMAL}),
+                        @Argument(allowedTypes={PDataType.VARCHAR, PDataType.INTEGER}, defaultValue = "null", isConstant=true),
+                        @Argument(allowedTypes={PDataType.INTEGER}, defaultValue="1", isConstant=true)
+                        } 
+                )
+public abstract class CeilFunction extends ScalarFunction {
+    
+    public static final String NAME = "CEIL";
+    
+    public CeilFunction(List<Expression> children) {
+        super(children);
+    }
+    
+    @Override
+    public String getName() {
+        return NAME;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
new file mode 100644
index 0000000..7f54cbc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDataType.PDataCodec;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Class encapsulating the CEIL operation on {@link org.apache.phoenix.schema.PDataType#TIMESTAMP}
+ * This class only supports CEIL {@link TimeUnit#MILLISECOND}. If you want more options of CEIL like 
+ * using {@link TimeUnit#HOUR} use {@link CeilDateExpression}
+ * 
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilTimestampExpression extends CeilDateExpression {
+    
+    public CeilTimestampExpression() {}
+    
+    private CeilTimestampExpression(List<Expression> children) {
+        super(children);
+    }
+    
+    /**
+     * Creates a {@link CeilTimestampExpression} that uses {@link TimeUnit#MILLISECOND} 
+     * as the time unit for rounding. 
+     */
+    public static CeilTimestampExpression create(Expression expr, int multiplier) throws SQLException {
+        List<Expression> childExprs = Lists.newArrayList(expr, getTimeUnitExpr(TimeUnit.MILLISECOND), getMultiplierExpr(multiplier));
+        return new CeilTimestampExpression(childExprs); 
+    }
+    
+    public static Expression create(List<Expression> children) throws SQLException {
+        Expression firstChild = children.get(0);
+        PDataType firstChildDataType = firstChild.getDataType();
+        String timeUnit = (String)((LiteralExpression)children.get(1)).getValue();
+        if(TimeUnit.MILLISECOND.toString().equalsIgnoreCase(timeUnit)) {
+            return new CeilTimestampExpression(children);
+        }
+        // Coerce TIMESTAMP to DATE, as the nanos has no affect
+        List<Expression> newChildren = Lists.newArrayListWithExpectedSize(children.size());
+        newChildren.add(CoerceExpression.create(firstChild, firstChildDataType == PDataType.TIMESTAMP ? PDataType.DATE : PDataType.UNSIGNED_DATE));
+        newChildren.addAll(children.subList(1, children.size()));
+        return CeilDateExpression.create(newChildren);
+    }
+    
+    /**
+     * Creates a {@link CeilTimestampExpression} that uses {@link TimeUnit#MILLISECOND} 
+     * as the time unit for rounding. 
+     */
+    public static CeilTimestampExpression create (Expression expr) throws SQLException {
+        return create(expr, 1);
+    }
+
+    @Override
+    protected PDataCodec getKeyRangeCodec(PDataType columnDataType) {
+        return columnDataType == PDataType.TIMESTAMP 
+                ? PDataType.DATE.getCodec() 
+                : columnDataType == PDataType.UNSIGNED_TIMESTAMP 
+                    ? PDataType.UNSIGNED_DATE.getCodec() 
+                    : super.getKeyRangeCodec(columnDataType);
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (children.get(0).evaluate(tuple, ptr)) {
+            ColumnModifier columnModifier = children.get(0).getColumnModifier();
+            PDataType dataType = getDataType();
+            int nanos = dataType.getNanos(ptr, columnModifier);
+            if (nanos > 0) {
+                long millis = dataType.getMillis(ptr, columnModifier); 
+                Timestamp roundedTs = new Timestamp(millis + 1);
+                byte[] byteValue = dataType.toBytes(roundedTs);
+                ptr.set(byteValue);
+            }
+            return true; // for timestamp we only support rounding up the milliseconds.
+        }
+        return false;
+    }   
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
new file mode 100644
index 0000000..05f6bd8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Function used to provide an alternative value when the first argument is null.
+ * Usage:
+ * COALESCE(expr1,expr2)
+ * If expr1 is not null, then it is returned, otherwise expr2 is returned. 
+ *
+ * TODO: better bind parameter type matching, since arg2 must be coercible
+ * to arg1. consider allowing a common base type?
+ * @author jtaylor
+ * @since 0.1
+ */
+@BuiltInFunction(name=CoalesceFunction.NAME, args= {
+    @Argument(),
+    @Argument()} )
+public class CoalesceFunction extends ScalarFunction {
+    public static final String NAME = "COALESCE";
+
+    public CoalesceFunction() {
+    }
+
+    public CoalesceFunction(List<Expression> children) throws SQLException {
+        super(children);
+        if (!children.get(1).getDataType().isCoercibleTo(children.get(0).getDataType())) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CONVERT_TYPE)
+                .setMessage(getName() + " expected " + children.get(0).getDataType() + ", but got " + children.get(1).getDataType())
+                .build().buildException();
+        }
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (children.get(0).evaluate(tuple, ptr)) {
+            return true;
+        }
+        return children.get(1).evaluate(tuple, ptr);
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return children.get(0).getDataType();
+    }
+
+    @Override
+    public Integer getByteSize() {
+        Integer maxByteSize1 = children.get(0).getByteSize();
+        if (maxByteSize1 != null) {
+            Integer maxByteSize2 = children.get(1).getByteSize();
+            if (maxByteSize2 != null) {
+                return maxByteSize1 > maxByteSize2 ? maxByteSize1 : maxByteSize2;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isNullable() {
+        return children.get(0).isNullable() && children.get(1).isNullable();
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
new file mode 100644
index 0000000..2c8da08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ * 
+ * Base class for aggregation functions which are composed of other
+ * aggregation functions (for example, AVG is modeled as a SUM aggregate
+ * function and a COUNT aggregate function).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class CompositeAggregateFunction extends AggregateFunction {
+
+    public CompositeAggregateFunction(List<Expression> children) {
+        super(children);
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        return null;
+    }
+}