You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:46 UTC
[30/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
new file mode 100644
index 0000000..ef0a9c2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctValueWithCountServerAggregator extends BaseAggregator {
+ private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
+ public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
+ public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
+ public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
+
+ private int compressThreshold;
+ private byte[] buffer = null;
+ private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+
+ public DistinctValueWithCountServerAggregator(Configuration conf) {
+ super(null);
+ compressThreshold = conf.getInt(QueryServices.DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB,
+ QueryServicesOptions.DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD);
+ }
+
+ public DistinctValueWithCountServerAggregator(Configuration conf, DistinctValueWithCountClientAggregator clientAgg) {
+ this(conf);
+ valueVsCount = clientAgg.valueVsCount;
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ ImmutableBytesPtr key = new ImmutableBytesPtr(ptr.get(), ptr.getOffset(), ptr.getLength());
+ Integer count = this.valueVsCount.get(key);
+ if (count == null) {
+ this.valueVsCount.put(key, 1);
+ } else {
+ this.valueVsCount.put(key, ++count);
+ }
+ }
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // This serializes the Map. The format is as follows
+ // Map size(VInt ie. 1 to 5 bytes) +
+ // ( key length [VInt ie. 1 to 5 bytes] + key bytes + value [VInt ie. 1 to 5 bytes] )*
+ int serializationSize = countMapSerializationSize();
+ buffer = new byte[serializationSize];
+ int offset = 1;
+ offset += ByteUtil.vintToBytes(buffer, offset, this.valueVsCount.size());
+ for (Entry<ImmutableBytesPtr, Integer> entry : this.valueVsCount.entrySet()) {
+ ImmutableBytesPtr key = entry.getKey();
+ offset += ByteUtil.vintToBytes(buffer, offset, key.getLength());
+ System.arraycopy(key.get(), key.getOffset(), buffer, offset, key.getLength());
+ offset += key.getLength();
+ offset += ByteUtil.vintToBytes(buffer, offset, entry.getValue().intValue());
+ }
+ if (serializationSize > compressThreshold) {
+ // The size for the map serialization is above the threshold. We will do the Snappy compression here.
+ ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream();
+ try {
+ compressedByteStream.write(COMPRESS_MARKER);
+ OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream,
+ COMPRESS_ALGO.getCompressor(), 0);
+ compressionStream.write(buffer, 1, buffer.length - 1);
+ compressionStream.flush();
+ ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size());
+ return true;
+ } catch (Exception e) {
+ LOG.error("Exception while Snappy compression of data.", e);
+ }
+ }
+ ptr.set(buffer, 0, offset);
+ return true;
+ }
+
+ // The #bytes required to serialize the count map.
+ // Here let us assume to use 4 bytes for each of the int items. Normally it will consume lesser
+ // bytes as we will use vints.
+ // TODO Do we need to consider 5 as the number of bytes for each of the int field? Else there is
+ // a chance of ArrayIndexOutOfBoundsException when all the int fields are having very large
+ // values. Will that ever occur?
+ private int countMapSerializationSize() {
+ int size = Bytes.SIZEOF_INT;// Write the number of entries in the Map
+ for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+ // Add up the key and key's lengths (Int) and the value
+ size += key.getLength() + Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+ }
+ return size;
+ }
+
+ // The heap size which will be taken by the count map.
+ private int countMapHeapSize() {
+ int size = 0;
+ if (this.valueVsCount.size() > 0) {
+ for (ImmutableBytesPtr key : this.valueVsCount.keySet()) {
+ size += SizedUtil.MAP_ENTRY_SIZE + // entry
+ Bytes.SIZEOF_INT + // key size
+ key.getLength() + SizedUtil.ARRAY_SIZE; // value size
+ }
+ } else {
+ // Initially when the getSize() is called, we dont have any entries in the map so as to
+ // tell the exact heap need. Let us approximate the #entries
+ SizedUtil.sizeOfMap(DEFAULT_ESTIMATED_DISTINCT_VALUES,
+ SizedUtil.IMMUTABLE_BYTES_PTR_SIZE, Bytes.SIZEOF_INT);
+ }
+ return size;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.VARBINARY;
+ }
+
+ @Override
+ public void reset() {
+ valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "DISTINCT VALUE vs COUNT";
+ }
+
+ @Override
+ public int getSize() {
+ // TODO make this size correct.??
+ // This size is being called initially at the begin of the scanner open. At that time we any
+ // way can not tell the exact size of the Map. The Aggregators get size from all Aggregator
+ // and stores in a variable for future use. This size of the Aggregators is being used in
+ // Grouped unordered scan. Do we need some changes there in that calculation?
+ return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
new file mode 100644
index 0000000..fa1dda7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DoubleSumAggregator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+public class DoubleSumAggregator extends BaseAggregator {
+
+ private double sum = 0;
+ private byte[] buffer;
+
+ public DoubleSumAggregator(ColumnModifier columnModifier, ImmutableBytesWritable ptr) {
+ super(columnModifier);
+ if (ptr != null) {
+ initBuffer();
+ sum = PDataType.DOUBLE.getCodec().decodeDouble(ptr, columnModifier);
+ }
+ }
+
+ protected PDataType getInputDataType() {
+ return PDataType.DOUBLE;
+ }
+
+ private void initBuffer() {
+ buffer = new byte[getDataType().getByteSize()];
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ double value = getInputDataType().getCodec().decodeDouble(ptr, columnModifier);
+ sum += value;
+ if (buffer == null) {
+ initBuffer();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ if (isNullable()) {
+ return false;
+ }
+ initBuffer();
+ }
+ ptr.set(buffer);
+ getDataType().getCodec().encodeDouble(sum, ptr);
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.DOUBLE;
+ }
+
+ @Override
+ public String toString() {
+ return "SUM [sum=" + sum + "]";
+ }
+
+ @Override
+ public void reset() {
+ sum = 0;
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
new file mode 100644
index 0000000..e96a993
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/IntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums integer values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class IntSumAggregator extends NumberSumAggregator {
+
+ public IntSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.INTEGER;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
new file mode 100644
index 0000000..bfdadc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/LongSumAggregator.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums long values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class LongSumAggregator extends NumberSumAggregator {
+
+ public LongSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.LONG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
new file mode 100644
index 0000000..890e14a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MaxAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+/**
+ * Aggregator that finds the max of values. Inverse of {@link MinAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MaxAggregator extends MinAggregator {
+
+ public MaxAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+ return !super.keepFirst(ibw1, ibw2);
+ }
+
+ @Override
+ public String toString() {
+ return "MAX [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
new file mode 100644
index 0000000..8954de2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/MinAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * Aggregator that finds the min of values. Inverse of {@link MaxAggregator}.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+abstract public class MinAggregator extends BaseAggregator {
+ /** Used to store the accumulate the results of the MIN function */
+ protected final ImmutableBytesWritable value = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+
+ public MinAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ public void reset() {
+ value.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ super.reset();
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + /*value*/ SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE;
+ }
+
+ /**
+ * Compares two bytes writables, and returns true if the first one should be
+ * kept, and false otherwise. For the MIN function, this method will return
+ * true if the first bytes writable is less than the second.
+ *
+ * @param ibw1 the first bytes writable
+ * @param ibw2 the second bytes writable
+ * @return true if the first bytes writable should be kept
+ */
+ protected boolean keepFirst(ImmutableBytesWritable ibw1, ImmutableBytesWritable ibw2) {
+ return 0 >= getDataType().compareTo(ibw1, columnModifier, ibw2, columnModifier, getDataType());
+ }
+
+ private boolean isNull() {
+ return value.get() == ByteUtil.EMPTY_BYTE_ARRAY;
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (isNull()) {
+ value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+ } else {
+ if (!keepFirst(value, ptr)) {
+ // replace the value with the new value
+ value.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MIN [value=" + Bytes.toStringBinary(value.get(),value.getOffset(),value.getLength()) + "]";
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (isNull()) {
+ return false;
+ }
+ ptr.set(value.get(), value.getOffset(), value.getLength());
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
new file mode 100644
index 0000000..a7cfcd1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NumberSumAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ *
+ * Aggregator that sums integral number values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class NumberSumAggregator extends BaseAggregator {
+ private long sum = 0;
+ private byte[] buffer;
+
+ public NumberSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ public NumberSumAggregator(ColumnModifier columnModifier,
+ ImmutableBytesWritable ptr) {
+ this(columnModifier);
+ if (ptr != null) {
+ initBuffer();
+ sum = PDataType.LONG.getCodec().decodeLong(ptr, columnModifier);
+ }
+ }
+
+ public long getSum() {
+ return sum;
+ }
+
+ abstract protected PDataType getInputDataType();
+
+ private int getBufferLength() {
+ return getDataType().getByteSize();
+ }
+
+ private void initBuffer() {
+ buffer = new byte[getBufferLength()];
+ }
+
+ @Override
+ public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // Get either IntNative or LongNative depending on input type
+ long value = getInputDataType().getCodec().decodeLong(ptr,
+ columnModifier);
+ sum += value;
+ if (buffer == null) {
+ initBuffer();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null) {
+ if (isNullable()) {
+ return false;
+ }
+ initBuffer();
+ }
+ ptr.set(buffer);
+ getDataType().getCodec().encodeLong(sum, ptr);
+ return true;
+ }
+
+ @Override
+ public final PDataType getDataType() {
+ return PDataType.LONG;
+ }
+
+ @Override
+ public void reset() {
+ sum = 0;
+ buffer = null;
+ super.reset();
+ }
+
+ @Override
+ public String toString() {
+ return "SUM [sum=" + sum + "]";
+ }
+
+ @Override
+ public int getSize() {
+ return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE
+ + getBufferLength();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
new file mode 100644
index 0000000..42ca267
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentRankClientAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENT_RANK aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentRankClientAggregator extends DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private BigDecimal cachedResult = null;
+
+ public PercentRankClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+ // be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+ boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression valueExp = (LiteralExpression)exps.get(2);
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ long distinctCountsSum = 0;
+ Object value = valueExp.getValue();
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ Object colValue = entry.getKey();
+ int compareResult = columnExp.getDataType().compareTo(colValue, value, valueExp.getDataType());
+ boolean done = isAscending ? compareResult > 0 : compareResult <= 0;
+ if (done) break;
+ distinctCountsSum += entry.getValue();
+ }
+
+ float result = (float)distinctCountsSum / totalCount;
+ this.cachedResult = new BigDecimal(result);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
new file mode 100644
index 0000000..095842a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileClientAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for PERCENTILE_CONT aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class PercentileClientAggregator extends DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private BigDecimal cachedResult = null;
+
+ public PercentileClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (cachedResult == null) {
+ ColumnExpression columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating whether the ordering to
+ // be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression)exps.get(1);
+ boolean isAscending = (Boolean)isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression percentileExp = (LiteralExpression)exps.get(2);
+ float p = ((Number)percentileExp.getValue()).floatValue();
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ float i = (p * this.totalCount) + 0.5F;
+ long k = (long)i;
+ float f = i - k;
+ Object o1 = null;
+ Object o2 = null;
+ long distinctCountsSum = 0;
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ if (o1 != null) {
+ o2 = entry.getKey();
+ break;
+ }
+ distinctCountsSum += entry.getValue();
+ if (distinctCountsSum == k) {
+ o1 = entry.getKey();
+ } else if (distinctCountsSum > k) {
+ o1 = o2 = entry.getKey();
+ break;
+ }
+ }
+
+ double result = 0.0;
+ Number n1 = (Number)o1;
+ if (o2 == null || o1 == o2) {
+ result = n1.doubleValue();
+ } else {
+ Number n2 = (Number)o2;
+ result = (n1.doubleValue() * (1.0F - f)) + (n2.doubleValue() * f);
+ }
+ this.cachedResult = new BigDecimal(result);
+ }
+ if (buffer == null) {
+ initBuffer();
+ }
+ buffer = PDataType.DECIMAL.toBytes(this.cachedResult);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
new file mode 100644
index 0000000..206ed1d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/PercentileDiscClientAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.*;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Built-in function for PERCENTILE_DISC(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate function
+ *
+ * @author ramkrishna
+ * @since 1.2.1
+ */
+public class PercentileDiscClientAggregator extends
+ DistinctValueWithCountClientAggregator {
+
+ private final List<Expression> exps;
+ private Object cachedResult = null;
+ ColumnExpression columnExp = null;
+
+ public PercentileDiscClientAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(columnModifier);
+ this.exps = exps;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ // Reset buffer so that it gets initialized with the current datatype of the column
+ buffer = null;
+ if (cachedResult == null) {
+ columnExp = (ColumnExpression)exps.get(0);
+ // Second exp will be a LiteralExpression of Boolean type indicating
+ // whether the ordering to be ASC/DESC
+ LiteralExpression isAscendingExpression = (LiteralExpression) exps
+ .get(1);
+ boolean isAscending = (Boolean) isAscendingExpression.getValue();
+
+ // Third expression will be LiteralExpression
+ LiteralExpression percentileExp = (LiteralExpression) exps.get(2);
+ float p = ((Number) percentileExp.getValue()).floatValue();
+ Map<Object, Integer> sorted = getSortedValueVsCount(isAscending, columnExp.getDataType());
+ int currValue = 0;
+ Object result = null;
+ // Here the Percentile_disc returns the cum_dist() that is greater or equal to the
+ // Percentile (p) specified in the query. So the result set will be of that of the
+ // datatype of the column being selected
+ for (Entry<Object, Integer> entry : sorted.entrySet()) {
+ result = entry.getKey();
+ Integer value = entry.getValue();
+ currValue += value;
+ float cum_dist = (float) currValue / (float) totalCount;
+ if (cum_dist >= p) {
+ break;
+ }
+ }
+ this.cachedResult = result;
+ }
+ if (buffer == null) {
+ // Initialize based on the datatype
+ // columnExp cannot be null
+ buffer = new byte[columnExp.getDataType().getByteSize()];
+ }
+ // Copy the result to the buffer.
+ System.arraycopy(columnExp.getDataType().toBytes(this.cachedResult), 0, buffer, 0, buffer.length);
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.cachedResult = null;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ // Will be used in the aggregate() call
+ return PDataType.DECIMAL.getByteSize();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
new file mode 100644
index 0000000..6457793
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Aggregators that execute on the server-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerAggregators extends Aggregators {
+ public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0);
+ private final Expression[] expressions;
+
+ private ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
+ super(functions, aggregators, minNullableIndex);
+ if (aggregators.length != expressions.length) {
+ throw new IllegalArgumentException("Number of aggregators (" + aggregators.length
+ + ") must match the number of expressions (" + Arrays.toString(expressions) + ")");
+ }
+ this.expressions = expressions;
+ }
+
+ @Override
+ public void aggregate(Aggregator[] aggregators, Tuple result) {
+ for (int i = 0; i < expressions.length; i++) {
+ if (expressions[i].evaluate(result, ptr)) {
+ aggregators[i].aggregate(result, ptr);
+ }
+ }
+ }
+
+ /**
+ * Serialize an Aggregator into a byte array
+ * @param aggFuncs list of aggregator to serialize
+ * @return serialized byte array respresentation of aggregator
+ */
+ public static byte[] serialize(List<SingleAggregateFunction> aggFuncs, int minNullableIndex) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, minNullableIndex);
+ WritableUtils.writeVInt(output, aggFuncs.size());
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(aggFunc).ordinal());
+ aggFunc.write(output);
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Aggregator[] newAggregators() {
+ return newAggregators(null);
+ }
+
+ public Aggregator[] newAggregators(Configuration conf) {
+ Aggregator[] aggregators = new Aggregator[functions.length];
+ for (int i = 0; i < functions.length; i++) {
+ aggregators[i] = functions[i].newServerAggregator(conf);
+ }
+ return aggregators;
+ }
+
+ /**
+ * Deserialize aggregators from the serialized byte array representation
+ * @param b byte array representation of a list of Aggregators
+ * @param conf Server side configuration used by HBase
+ * @return newly instantiated Aggregators instance
+ */
+ public static ServerAggregators deserialize(byte[] b, Configuration conf) {
+ if (b == null) {
+ return ServerAggregators.EMPTY_AGGREGATORS;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(b);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int minNullableIndex = WritableUtils.readVInt(input);
+ int len = WritableUtils.readVInt(input);
+ Aggregator[] aggregators = new Aggregator[len];
+ Expression[] expressions = new Expression[len];
+ SingleAggregateFunction[] functions = new SingleAggregateFunction[len];
+ for (int i = 0; i < aggregators.length; i++) {
+ SingleAggregateFunction aggFunc = (SingleAggregateFunction)ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ aggFunc.readFields(input, conf);
+ functions[i] = aggFunc;
+ aggregators[i] = aggFunc.getAggregator();
+ expressions[i] = aggFunc.getAggregatorExpression();
+ }
+ return new ServerAggregators(functions, aggregators,expressions, minNullableIndex);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
new file mode 100644
index 0000000..30276e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevPopAggregator extends BaseStddevAggregator {
+
+ public StddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
new file mode 100644
index 0000000..49f52d2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/StddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations
+ *
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class StddevSampAggregator extends BaseStddevAggregator {
+
+ public StddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+ super(exps, columnModifier);
+ }
+
+ @Override
+ protected long getDataPointsCount() {
+ return totalCount - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
new file mode 100644
index 0000000..c8befca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedIntSumAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums unsigned integer values
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedIntSumAggregator extends NumberSumAggregator {
+
+ public UnsignedIntSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.UNSIGNED_INT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
new file mode 100644
index 0000000..b91a934
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/UnsignedLongSumAggregator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Aggregator that sums unsigned long values
+ * TODO: create these classes dynamically based on the type passed through
+ *
+ * @author jtaylor
+ * @since 0.12
+ */
+public class UnsignedLongSumAggregator extends NumberSumAggregator {
+
+ public UnsignedLongSumAggregator(ColumnModifier columnModifier) {
+ super(columnModifier);
+ }
+
+ @Override
+ protected PDataType getInputDataType() {
+ return PDataType.UNSIGNED_LONG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
new file mode 100644
index 0000000..72809f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AggregateFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+
+
+
+
+/**
+ *
+ * Compiled representation of a built-in aggregate function
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class AggregateFunction extends FunctionExpression {
+
+ public AggregateFunction() {
+ }
+
+ public AggregateFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public boolean isStateless() {
+ return false;
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
new file mode 100644
index 0000000..a4185e6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayIndexFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.ParseException;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+@BuiltInFunction(name = ArrayIndexFunction.NAME, args = {
+ @Argument(allowedTypes = { PDataType.BINARY_ARRAY,
+ PDataType.VARBINARY_ARRAY }),
+ @Argument(allowedTypes = { PDataType.INTEGER }) })
+public class ArrayIndexFunction extends ScalarFunction {
+
+ public static final String NAME = "ARRAY_ELEM";
+
+ public ArrayIndexFunction() {
+ }
+
+ public ArrayIndexFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ Expression indexExpr = children.get(1);
+ if (!indexExpr.evaluate(tuple, ptr)) {
+ return false;
+ } else if (ptr.getLength() == 0) {
+ return true;
+ }
+ // Use Codec to prevent Integer object allocation
+ int index = PDataType.INTEGER.getCodec().decodeInt(ptr, indexExpr.getColumnModifier());
+ if(index < 0) {
+ throw new ParseException("Index cannot be negative :" + index);
+ }
+ Expression arrayExpr = children.get(0);
+ if (!arrayExpr.evaluate(tuple, ptr)) {
+ return false;
+ } else if (ptr.getLength() == 0) {
+ return true;
+ }
+
+ // Given a ptr to the entire array, set ptr to point to a particular element within that array
+ // given the type of an array element (see comments in PDataTypeForArray)
+ PArrayDataType.positionAtArrayElement(ptr, index, getDataType());
+ return true;
+
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.fromTypeId(children.get(0).getDataType().getSqlType()
+ - Types.ARRAY);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
new file mode 100644
index 0000000..9a94129
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/ArrayLengthFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+@BuiltInFunction(name = ArrayLengthFunction.NAME, args = { @Argument(allowedTypes = {
+ PDataType.BINARY_ARRAY, PDataType.VARBINARY_ARRAY }) })
+public class ArrayLengthFunction extends ScalarFunction {
+ public static final String NAME = "ARRAY_LENGTH";
+
+ public ArrayLengthFunction() {
+ }
+
+ public ArrayLengthFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ Expression arrayExpr = children.get(0);
+ if (!arrayExpr.evaluate(tuple, ptr)) {
+ return false;
+ } else if (ptr.getLength() == 0) {
+ return true;
+ }
+ PDataType baseType = PDataType.fromTypeId(children.get(0).getDataType()
+ .getSqlType()
+ - Types.ARRAY);
+ int length = PArrayDataType.getArrayLength(ptr, baseType);
+ byte[] lengthBuf = new byte[PDataType.INTEGER.getByteSize()];
+ PDataType.INTEGER.getCodec().encodeInt(length, lengthBuf, 0);
+ ptr.set(lengthBuf);
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ // Array length will return an Integer
+ return PDataType.INTEGER;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
new file mode 100644
index 0000000..62bb565
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/AvgAggregateFunction.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.parse.*;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+@BuiltInFunction(name=AvgAggregateFunction.NAME, nodeClass=AvgAggregateParseNode.class, args= {@Argument(allowedTypes={PDataType.DECIMAL})} )
+public class AvgAggregateFunction extends CompositeAggregateFunction {
+ public static final String NAME = "AVG";
+ private final CountAggregateFunction countFunc;
+ private final SumAggregateFunction sumFunc;
+ private Integer scale;
+
+ // TODO: remove when not required at built-in func register time
+ public AvgAggregateFunction(List<Expression> children) {
+ super(children);
+ this.countFunc = null;
+ this.sumFunc = null;
+ setScale(children);
+ }
+
+ public AvgAggregateFunction(List<Expression> children, CountAggregateFunction countFunc, SumAggregateFunction sumFunc) {
+ super(children);
+ this.countFunc = countFunc;
+ this.sumFunc = sumFunc;
+ setScale(children);
+ }
+
+ private void setScale(List<Expression> children) {
+ scale = PDataType.MIN_DECIMAL_AVG_SCALE; // At least 4;
+ for (Expression child: children) {
+ if (child.getScale() != null) {
+ scale = Math.max(scale, child.getScale());
+ }
+ }
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.DECIMAL;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (!countFunc.evaluate(tuple, ptr)) {
+ return false;
+ }
+ long count = countFunc.getDataType().getCodec().decodeLong(ptr, null);
+ if (count == 0) {
+ return false;
+ }
+
+ // Normal case where a column reference was used as the argument to AVG
+ if (!countFunc.isConstantExpression()) {
+ sumFunc.evaluate(tuple, ptr);
+ BigDecimal sum = (BigDecimal)PDataType.DECIMAL.toObject(ptr, sumFunc.getDataType());
+ // For the final column projection, we divide the sum by the count, both coerced to BigDecimal.
+ // TODO: base the precision on column metadata instead of constant
+ BigDecimal avg = sum.divide(BigDecimal.valueOf(count), PDataType.DEFAULT_MATH_CONTEXT);
+ avg = avg.setScale(scale, BigDecimal.ROUND_DOWN);
+ ptr.set(PDataType.DECIMAL.toBytes(avg));
+ return true;
+ }
+ BigDecimal value = (BigDecimal) ((LiteralExpression)countFunc.getChildren().get(0)).getValue();
+ value = value.setScale(scale, BigDecimal.ROUND_DOWN);
+ ptr.set(PDataType.DECIMAL.toBytes(value));
+ return true;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return sumFunc != null && sumFunc.isNullable();
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public Integer getScale() {
+ return scale;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
new file mode 100644
index 0000000..d0951cf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDateExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.Expression;
+
+/**
+ *
+ * Class encapsulating ceil operation on {@link org.apache.phoenix.schema.PDataType#DATE}.
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilDateExpression extends RoundDateExpression {
+
+ public CeilDateExpression() {}
+
+ /**
+ * @param timeUnit - unit of time to round up to.
+ * Creates a {@link CeilDateExpression} with default multiplier of 1.
+ */
+ public static Expression create(Expression expr, TimeUnit timeUnit) throws SQLException {
+ return create(expr, timeUnit, 1);
+ }
+
+ /**
+ * @param timeUnit - unit of time to round up to
+ * @param multiplier - determines the roll up window size.
+ * Create a {@link CeilDateExpression}.
+ */
+ public static Expression create(Expression expr, TimeUnit timeUnit, int multiplier) throws SQLException {
+ Expression timeUnitExpr = getTimeUnitExpr(timeUnit);
+ Expression defaultMultiplierExpr = getMultiplierExpr(multiplier);
+ List<Expression> expressions = Lists.newArrayList(expr, timeUnitExpr, defaultMultiplierExpr);
+ return CeilDateExpression.create(expressions);
+ }
+
+ public static Expression create(List<Expression> children) throws SQLException {
+ return new CeilDateExpression(children);
+ }
+
+ CeilDateExpression(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ protected long getRoundUpAmount() {
+ return divBy - 1;
+ }
+
+ @Override
+ public String getName() {
+ return CeilFunction.NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
new file mode 100644
index 0000000..dd2c22a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilDecimalExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.math.RoundingMode;
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ *
+ * Class encapsulating the CEIL operation on a {@link org.apache.phoenix.schema.PDataType#DECIMAL}
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilDecimalExpression extends RoundDecimalExpression {
+
+ public CeilDecimalExpression() {}
+
+ public CeilDecimalExpression(List<Expression> children) {
+ super(children);
+ }
+
+ /**
+ * Creates a {@link CeilDecimalExpression} with rounding scale given by @param scale.
+ *
+ */
+ public static Expression create(Expression expr, int scale) throws SQLException {
+ if (expr.getDataType().isCoercibleTo(PDataType.LONG)) {
+ return expr;
+ }
+ Expression scaleExpr = LiteralExpression.newConstant(scale, PDataType.INTEGER, true);
+ List<Expression> expressions = Lists.newArrayList(expr, scaleExpr);
+ return new CeilDecimalExpression(expressions);
+ }
+
+ /**
+ * Creates a {@link CeilDecimalExpression} with a default scale of 0 used for rounding.
+ *
+ */
+ public static Expression create(Expression expr) throws SQLException {
+ return create(expr, 0);
+ }
+
+ @Override
+ protected RoundingMode getRoundingMode() {
+ return RoundingMode.CEILING;
+ }
+
+ @Override
+ public String getName() {
+ return CeilFunction.NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
new file mode 100644
index 0000000..60ad329
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.CeilParseNode;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+/**
+ *
+ * Base class for built-in CEIL function.
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+@BuiltInFunction(name = CeilFunction.NAME,
+ nodeClass = CeilParseNode.class,
+ args = {
+ @Argument(allowedTypes={PDataType.TIMESTAMP, PDataType.DECIMAL}),
+ @Argument(allowedTypes={PDataType.VARCHAR, PDataType.INTEGER}, defaultValue = "null", isConstant=true),
+ @Argument(allowedTypes={PDataType.INTEGER}, defaultValue="1", isConstant=true)
+ }
+ )
+public abstract class CeilFunction extends ScalarFunction {
+
+ public static final String NAME = "CEIL";
+
+ public CeilFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
new file mode 100644
index 0000000..7f54cbc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CeilTimestampExpression.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDataType.PDataCodec;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Class encapsulating the CEIL operation on {@link org.apache.phoenix.schema.PDataType#TIMESTAMP}
+ * This class only supports CEIL {@link TimeUnit#MILLISECOND}. If you want more options of CEIL like
+ * using {@link TimeUnit#HOUR} use {@link CeilDateExpression}
+ *
+ * @author samarth.jain
+ * @since 3.0.0
+ */
+public class CeilTimestampExpression extends CeilDateExpression {
+
+ public CeilTimestampExpression() {}
+
+ private CeilTimestampExpression(List<Expression> children) {
+ super(children);
+ }
+
+ /**
+ * Creates a {@link CeilTimestampExpression} that uses {@link TimeUnit#MILLISECOND}
+ * as the time unit for rounding.
+ */
+ public static CeilTimestampExpression create(Expression expr, int multiplier) throws SQLException {
+ List<Expression> childExprs = Lists.newArrayList(expr, getTimeUnitExpr(TimeUnit.MILLISECOND), getMultiplierExpr(multiplier));
+ return new CeilTimestampExpression(childExprs);
+ }
+
+ public static Expression create(List<Expression> children) throws SQLException {
+ Expression firstChild = children.get(0);
+ PDataType firstChildDataType = firstChild.getDataType();
+ String timeUnit = (String)((LiteralExpression)children.get(1)).getValue();
+ if(TimeUnit.MILLISECOND.toString().equalsIgnoreCase(timeUnit)) {
+ return new CeilTimestampExpression(children);
+ }
+ // Coerce TIMESTAMP to DATE, as the nanos has no affect
+ List<Expression> newChildren = Lists.newArrayListWithExpectedSize(children.size());
+ newChildren.add(CoerceExpression.create(firstChild, firstChildDataType == PDataType.TIMESTAMP ? PDataType.DATE : PDataType.UNSIGNED_DATE));
+ newChildren.addAll(children.subList(1, children.size()));
+ return CeilDateExpression.create(newChildren);
+ }
+
+ /**
+ * Creates a {@link CeilTimestampExpression} that uses {@link TimeUnit#MILLISECOND}
+ * as the time unit for rounding.
+ */
+ public static CeilTimestampExpression create (Expression expr) throws SQLException {
+ return create(expr, 1);
+ }
+
+ @Override
+ protected PDataCodec getKeyRangeCodec(PDataType columnDataType) {
+ return columnDataType == PDataType.TIMESTAMP
+ ? PDataType.DATE.getCodec()
+ : columnDataType == PDataType.UNSIGNED_TIMESTAMP
+ ? PDataType.UNSIGNED_DATE.getCodec()
+ : super.getKeyRangeCodec(columnDataType);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (children.get(0).evaluate(tuple, ptr)) {
+ ColumnModifier columnModifier = children.get(0).getColumnModifier();
+ PDataType dataType = getDataType();
+ int nanos = dataType.getNanos(ptr, columnModifier);
+ if (nanos > 0) {
+ long millis = dataType.getMillis(ptr, columnModifier);
+ Timestamp roundedTs = new Timestamp(millis + 1);
+ byte[] byteValue = dataType.toBytes(roundedTs);
+ ptr.set(byteValue);
+ }
+ return true; // for timestamp we only support rounding up the milliseconds.
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
new file mode 100644
index 0000000..05f6bd8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CoalesceFunction.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Function used to provide an alternative value when the first argument is null.
+ * Usage:
+ * COALESCE(expr1,expr2)
+ * If expr1 is not null, then it is returned, otherwise expr2 is returned.
+ *
+ * TODO: better bind parameter type matching, since arg2 must be coercible
+ * to arg1. consider allowing a common base type?
+ * @author jtaylor
+ * @since 0.1
+ */
+@BuiltInFunction(name=CoalesceFunction.NAME, args= {
+ @Argument(),
+ @Argument()} )
+public class CoalesceFunction extends ScalarFunction {
+ public static final String NAME = "COALESCE";
+
+ public CoalesceFunction() {
+ }
+
+ public CoalesceFunction(List<Expression> children) throws SQLException {
+ super(children);
+ if (!children.get(1).getDataType().isCoercibleTo(children.get(0).getDataType())) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CONVERT_TYPE)
+ .setMessage(getName() + " expected " + children.get(0).getDataType() + ", but got " + children.get(1).getDataType())
+ .build().buildException();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (children.get(0).evaluate(tuple, ptr)) {
+ return true;
+ }
+ return children.get(1).evaluate(tuple, ptr);
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return children.get(0).getDataType();
+ }
+
+ @Override
+ public Integer getByteSize() {
+ Integer maxByteSize1 = children.get(0).getByteSize();
+ if (maxByteSize1 != null) {
+ Integer maxByteSize2 = children.get(1).getByteSize();
+ if (maxByteSize2 != null) {
+ return maxByteSize1 > maxByteSize2 ? maxByteSize1 : maxByteSize2;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return children.get(0).isNullable() && children.get(1).isNullable();
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
new file mode 100644
index 0000000..2c8da08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/CompositeAggregateFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ *
+ * Base class for aggregation functions which are composed of other
+ * aggregation functions (for example, AVG is modeled as a SUM aggregate
+ * function and a COUNT aggregate function).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class CompositeAggregateFunction extends AggregateFunction {
+
+ public CompositeAggregateFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public final <T> T accept(ExpressionVisitor<T> visitor) {
+ return null;
+ }
+}