You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:53 UTC
[05/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported
Spark 1.6 in Carbondata (#670)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
new file mode 100644
index 0000000..0629007
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public abstract class AbstractDistinctCountAggregatorObjectSet implements MeasureAggregator {
+
+ private static final long serialVersionUID = 6313463368629960186L;
+
+ protected Set<Object> valueSetForObj;
+
+ public AbstractDistinctCountAggregatorObjectSet() {
+ valueSetForObj = new HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ /**
+ * just need to add the unique values to agg set
+ */
+ @Override public void agg(double newVal) {
+ }
+
+ /**
+ * Distinct count Aggregate function which update the Distinct count
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ valueSetForObj.add(newVal);
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ return null;
+ }
+
+ @Override public Double getDoubleValue() {
+ return (double) valueSetForObj.size();
+ }
+
+ @Override public Long getLongValue() {
+ return (long) valueSetForObj.size();
+ }
+
+ @Override public BigDecimal getBigDecimalValue() {
+ return new BigDecimal(valueSetForObj.size());
+ }
+
+ @Override public Object getValueObject() {
+ return valueSetForObj.size();
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ valueSetForObj.add(newValue);
+ }
+
+ @Override public boolean isFirstTime() {
+ return false;
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+
+ }
+
+ public String toString() {
+ return valueSetForObj.size() + "";
+ }
+
+ @Override public void merge(byte[] value) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
new file mode 100644
index 0000000..1b2b33d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * * The distinct count aggregator
+ * Ex:
+ * ID NAME Sales
+ * <p>1 a 200
+ * <p>2 a 100
+ * <p>3 a 200
+ * select count(distinct sales) # would result 2
+ * select count(sales) # would result 3
+ */
+public class DistinctCountAggregator implements MeasureAggregator {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DistinctCountAggregator.class.getName());
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6313463368629960186L;
+ /**
+ * For Spark CARBON to avoid heavy object transfer it better to flatten
+ * the Aggregators. There is no aggregation expected after setting this value.
+ */
+ private Double computedFixedValue;
+ /**
+ *
+ */
+ // private Set<Double> valueSet;
+ private RoaringBitmap valueSet;
+
+ private byte[] data;
+
+ private double minValue;
+
+ public DistinctCountAggregator(Object minValue) {
+ valueSet = new RoaringBitmap();
+ if (minValue instanceof BigDecimal) {
+ this.minValue = ((BigDecimal) minValue).doubleValue();
+ } else if (minValue instanceof Long) {
+ this.minValue = ((Long) minValue).doubleValue();
+ } else {
+ this.minValue = (Double) minValue;
+ }
+ }
+
+ public DistinctCountAggregator() {
+ valueSet = new RoaringBitmap();
+ }
+
+ /**
+ * just need to add the unique values to agg set
+ */
+ @Override public void agg(double newVal) {
+ valueSet.add((int) (newVal - minValue));
+ }
+
+ /**
+ * Distinct count Aggregate function which update the Distinct count
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ if (newVal instanceof byte[]) {
+ byte[] values = (byte[]) newVal;
+ ByteBuffer buffer = ByteBuffer.wrap(values);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ valueSet.add(buffer.getInt());
+ }
+ return;
+ } else {
+ double value = new Double(newVal.toString());
+ agg(value);
+ }
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSet.add((int) dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ if (valueSet.getCardinality() == 0) {
+ return new byte[0];
+ }
+ IntIterator iterator = valueSet.getIntIterator();
+ ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8);
+ buffer.putDouble(minValue);
+ while (iterator.hasNext()) {
+ buffer.putInt(iterator.next());
+ }
+ buffer.rewind();
+ return buffer.array();
+ }
+
+ private void agg(RoaringBitmap set2, double minValue) {
+ if (this.minValue == minValue) {
+ valueSet.or(set2);
+ } else {
+ if (this.minValue > minValue) {
+ IntIterator intIterator = valueSet.getIntIterator();
+ while (intIterator.hasNext()) {
+ set2.add((int) ((double) (intIterator.next() + this.minValue) - minValue));
+ }
+ this.minValue = minValue;
+ this.valueSet = set2;
+ } else {
+ IntIterator intIterator = set2.getIntIterator();
+ while (intIterator.hasNext()) {
+ valueSet.add((int) ((double) (intIterator.next() + minValue) - this.minValue));
+ }
+ }
+ }
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ DistinctCountAggregator distinctCountAggregator = (DistinctCountAggregator) aggregator;
+ readData();
+ distinctCountAggregator.readData();
+ if (distinctCountAggregator.valueSet != null) {
+ agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue);
+ }
+ }
+
+ @Override public Double getDoubleValue() {
+ if (computedFixedValue == null) {
+ readData();
+ return (double) valueSet.getCardinality();
+ }
+ return computedFixedValue;
+ }
+
+ @Override public Long getLongValue() {
+ if (computedFixedValue == null) {
+ readData();
+ return (long) valueSet.getCardinality();
+ }
+ return computedFixedValue.longValue();
+ }
+
+ @Override public BigDecimal getBigDecimalValue() {
+ if (computedFixedValue == null) {
+ readData();
+ return new BigDecimal(valueSet.getCardinality());
+ }
+ return new BigDecimal(computedFixedValue);
+ }
+
+ @Override public Object getValueObject() {
+ return valueSet.getCardinality();
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ computedFixedValue = (Double) newValue;
+ valueSet = null;
+ }
+
+ @Override public boolean isFirstTime() {
+ return false;
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+
+ if (computedFixedValue != null) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+ byteBuffer.putInt(-1);
+ byteBuffer.putDouble(computedFixedValue);
+ byteBuffer.flip();
+ output.write(byteBuffer.array());
+ } else {
+ if (valueSet != null) {
+ valueSet.serialize(output);
+ } else {
+ output.write(data);
+ }
+ }
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ valueSet = new RoaringBitmap();
+ valueSet.deserialize(inPut);
+ }
+
+ private void readData() {
+ if (data != null && (valueSet == null || valueSet.isEmpty())) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(data);
+ DataInputStream outputStream = new DataInputStream(stream);
+ try {
+ readData(outputStream);
+ outputStream.close();
+ data = null;
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ DistinctCountAggregator aggr = new DistinctCountAggregator(minValue);
+ aggr.valueSet = valueSet.clone();
+ return aggr;
+ }
+
+ @Override public int compareTo(MeasureAggregator measureAggr) {
+ double compFixedVal = getDoubleValue();
+ double otherVal = measureAggr.getDoubleValue();
+ if (compFixedVal > otherVal) {
+ return 1;
+ }
+ if (compFixedVal < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if(!(obj instanceof DistinctCountAggregator)) {
+ return false;
+ }
+ DistinctCountAggregator o = (DistinctCountAggregator) obj;
+ return getDoubleValue().equals(o.getDoubleValue());
+ }
+
+ @Override public int hashCode() {
+ return getDoubleValue().hashCode();
+ }
+
+ @Override public MeasureAggregator get() {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteStream);
+ try {
+ writeData(outputStream);
+ } catch (IOException ex) {
+ LOGGER.error(ex, ex.getMessage());
+ }
+ data = byteStream.toByteArray();
+ valueSet = null;
+ return this;
+ }
+
+ public String toString() {
+ if (computedFixedValue == null) {
+ readData();
+ return valueSet.getCardinality() + "";
+ }
+ return computedFixedValue + "";
+ }
+
+ public RoaringBitmap getBitMap() {
+ return valueSet;
+ }
+
+ public double getMinValue() {
+ return minValue;
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ buffer.rewind();
+ double currentMinValue = buffer.getDouble();
+ while (buffer.hasRemaining()) {
+ agg(buffer.getInt() + currentMinValue);
+ }
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new DistinctCountAggregator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
new file mode 100644
index 0000000..3b26e53
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet {
+
+ private static final long serialVersionUID = 6313463368629960186L;
+
+ /**
+ * just need to add the unique values to agg set
+ */
+ @Override public void agg(double newVal) {
+ valueSetForObj.add(newVal);
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+ }
+ }
+
+ private void agg(Set<Object> set2) {
+ valueSetForObj.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ DistinctCountAggregatorObjectSet distinctCountAggregator =
+ (DistinctCountAggregatorObjectSet) aggregator;
+ agg(distinctCountAggregator.valueSetForObj);
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ DistinctCountAggregatorObjectSet aggregator = new DistinctCountAggregatorObjectSet();
+ aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator measureAggr) {
+ double valueSetForObjSize = getDoubleValue();
+ double otherVal = measureAggr.getDoubleValue();
+ if (valueSetForObjSize > otherVal) {
+ return 1;
+ }
+ if (valueSetForObjSize < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof DistinctCountAggregatorObjectSet)) {
+ return false;
+ }
+ DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) obj;
+ return getDoubleValue().equals(o.getDoubleValue());
+ }
+
+ @Override public int hashCode() {
+ return getDoubleValue().hashCode();
+ }
+
+ @Override public MeasureAggregator get() {
+ return this;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new DistinctCountAggregatorObjectSet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
new file mode 100644
index 0000000..2f44bca
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountBigDecimalAggregatorObjectSet
+ extends AbstractDistinctCountAggregatorObjectSet {
+
+ private static final long serialVersionUID = 6313463368629960186L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+ }
+ }
+
+ private void agg(Set<Object> set2) {
+ valueSetForObj.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ DistinctCountBigDecimalAggregatorObjectSet distinctCountBigDecimalAggregatorObjectSet =
+ (DistinctCountBigDecimalAggregatorObjectSet) aggregator;
+ agg(distinctCountBigDecimalAggregatorObjectSet.valueSetForObj);
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ DistinctCountBigDecimalAggregatorObjectSet aggregator =
+ new DistinctCountBigDecimalAggregatorObjectSet();
+ aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator measureAggr) {
+ BigDecimal valueSetForObjSize = getBigDecimalValue();
+ BigDecimal otherVal = measureAggr.getBigDecimalValue();
+ return valueSetForObjSize.compareTo(otherVal);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof DistinctCountBigDecimalAggregatorObjectSet)) {
+ return false;
+ }
+ DistinctCountBigDecimalAggregatorObjectSet o = (DistinctCountBigDecimalAggregatorObjectSet) obj;
+ return getBigDecimalValue().equals(o.getBigDecimalValue());
+ }
+
+ @Override public int hashCode() {
+ return getBigDecimalValue().hashCode();
+ }
+
+ @Override public MeasureAggregator get() {
+ return this;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new DistinctCountBigDecimalAggregatorObjectSet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
new file mode 100644
index 0000000..c4f7216
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountLongAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet {
+
+ private static final long serialVersionUID = 6313463368629960186L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+ }
+ }
+
+ private void agg(Set<Object> set2) {
+ valueSetForObj.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ DistinctCountLongAggregatorObjectSet distinctCountAggregator =
+ (DistinctCountLongAggregatorObjectSet) aggregator;
+ agg(distinctCountAggregator.valueSetForObj);
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ DistinctCountLongAggregatorObjectSet aggregator = new DistinctCountLongAggregatorObjectSet();
+ aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator measureAggr) {
+ long valueSetForObjSize = getLongValue();
+ long otherVal = measureAggr.getLongValue();
+ if (valueSetForObjSize > otherVal) {
+ return 1;
+ }
+ if (valueSetForObjSize < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof DistinctCountLongAggregatorObjectSet)) {
+ return false;
+ }
+ DistinctCountLongAggregatorObjectSet o = (DistinctCountLongAggregatorObjectSet) obj;
+ return getLongValue().equals(o.getLongValue());
+ }
+
+ @Override public int hashCode() {
+ return getLongValue().hashCode();
+ }
+
+ @Override public MeasureAggregator get() {
+ return this;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new DistinctCountLongAggregatorObjectSet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
new file mode 100644
index 0000000..e3d4623
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctStringCountAggregator implements MeasureAggregator {
+ private static final long serialVersionUID = 6313463368629960186L;
+
+ private Set<String> valueSetForStr;
+
+ public DistinctStringCountAggregator() {
+ this.valueSetForStr = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ public void agg(double newVal) {
+ }
+
+ public void agg(String newVal) {
+ this.valueSetForStr.add(newVal);
+ }
+
+ private void agg(Set<String> set2) {
+ this.valueSetForStr.addAll(set2);
+ }
+
+ public void merge(MeasureAggregator aggregator) {
+ DistinctStringCountAggregator distinctCountAggregator =
+ (DistinctStringCountAggregator) aggregator;
+ agg(distinctCountAggregator.valueSetForStr);
+ }
+
+ public Double getDoubleValue() {
+ return (double) this.valueSetForStr.size();
+ }
+
+ public Long getLongValue() {
+ return (long) this.valueSetForStr.size();
+ }
+
+ public BigDecimal getBigDecimalValue() {
+ return new BigDecimal(this.valueSetForStr.size());
+ }
+
+ public Object getValueObject() {
+ return Integer.valueOf(this.valueSetForStr.size());
+ }
+
+ public void setNewValue(Object newValue) {
+ }
+
+ public boolean isFirstTime() {
+ return false;
+ }
+
+ public void writeData(DataOutput output) throws IOException {
+ int length = this.valueSetForStr.size() * 8;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4);
+ byteBuffer.putInt(length);
+ for (String val : this.valueSetForStr) {
+ byte[] b = val.getBytes(Charset.defaultCharset());
+ byteBuffer.putInt(b.length);
+ byteBuffer.put(b);
+ }
+ byteBuffer.flip();
+ output.write(byteBuffer.array());
+ }
+
+ public void readData(DataInput inPut) throws IOException {
+ int length = inPut.readInt();
+ length /= 8;
+ this.valueSetForStr = new HashSet<String>(length + 1, 1.0F);
+ for (int i = 0; i < length; i++) {
+ byte[] b = new byte[inPut.readInt()];
+ inPut.readFully(b);
+ this.valueSetForStr.add(new String(b, Charset.defaultCharset()));
+ }
+ }
+
+ public MeasureAggregator getCopy() {
+ DistinctStringCountAggregator aggregator = new DistinctStringCountAggregator();
+ aggregator.valueSetForStr = new HashSet<String>(this.valueSetForStr);
+ return aggregator;
+ }
+
+ public int compareTo(MeasureAggregator o) {
+ double val = getDoubleValue();
+ double otherVal = o.getDoubleValue();
+ if (val > otherVal) {
+ return 1;
+ }
+ if (val < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if(!(obj instanceof DistinctStringCountAggregator)) {
+ return false;
+ }
+ DistinctStringCountAggregator o = (DistinctStringCountAggregator) obj;
+ return getDoubleValue().equals(o.getDoubleValue());
+ }
+
+ @Override public int hashCode() {
+ return getDoubleValue().hashCode();
+ }
+
+ @Override public void agg(Object newVal) {
+ this.valueSetForStr.add((String) newVal);
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ }
+
+ @Override public byte[] getByteArray() {
+ return null;
+ }
+
+ @Override public MeasureAggregator get() {
+ return this;
+ }
+
+ public String toString() {
+ return valueSetForStr.size() + "";
+ }
+
+ @Override public void merge(byte[] value) {
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new DistinctStringCountAggregator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
new file mode 100644
index 0000000..6a59ec9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+public class SumDistinctBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6313463368629960155L;
+
+ /**
+ * For Spark CARBON to avoid heavy object transfer it better to flatten the
+ * Aggregators. There is no aggregation expected after setting this value.
+ */
+ private BigDecimal computedFixedValue;
+
+ /**
+ *
+ */
+ private Set<BigDecimal> valueSet;
+
+ public SumDistinctBigDecimalAggregator() {
+ valueSet = new HashSet<BigDecimal>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ /**
+ * Distinct Aggregate function which update the Distinct set
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ valueSet.add(
+ newVal instanceof BigDecimal ? (BigDecimal) newVal : new BigDecimal(newVal.toString()));
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSet.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ Iterator<BigDecimal> iterator = valueSet.iterator();
+ ByteBuffer buffer =
+ ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+ while (iterator.hasNext()) {
+ byte[] bytes = DataTypeUtil.bigDecimalToByte(iterator.next());
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ }
+ buffer.rewind();
+ return buffer.array();
+ }
+
+ private void agg(Set<BigDecimal> set2) {
+ valueSet.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ SumDistinctBigDecimalAggregator distinctAggregator =
+ (SumDistinctBigDecimalAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(distinctAggregator.valueSet);
+ firstTime = false;
+ }
+ }
+
+ @Override public BigDecimal getBigDecimalValue() {
+ if (computedFixedValue == null) {
+ BigDecimal result = new BigDecimal(0);
+ for (BigDecimal aValue : valueSet) {
+ result = result.add(aValue);
+ }
+ return result;
+ }
+ return computedFixedValue;
+ }
+
+ @Override public Object getValueObject() {
+ return getBigDecimalValue();
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ computedFixedValue = (BigDecimal) newValue;
+ valueSet = null;
+ }
+
+ @Override public boolean isFirstTime() {
+ return firstTime;
+ }
+
+ @Override public void writeData(DataOutput dataOutput) throws IOException {
+ if (computedFixedValue != null) {
+ byte[] bytes = DataTypeUtil.bigDecimalToByte(computedFixedValue);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + bytes.length);
+ byteBuffer.putInt(-1);
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ } else {
+ int length = valueSet.size() * 8 + valueSet.size() * 4;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+ byteBuffer.putInt(length);
+ for (BigDecimal val : valueSet) {
+ byte[] bytes =
+ val.toString().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ byteBuffer.putInt(-1);
+ byteBuffer.putInt(bytes.length);
+ byteBuffer.put(bytes);
+ }
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ }
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ int length = inPut.readInt();
+
+ if (length == -1) {
+ computedFixedValue = new BigDecimal(inPut.readUTF());
+ valueSet = null;
+ } else {
+ length = length / 8;
+ valueSet = new HashSet<BigDecimal>(length + 1, 1.0f);
+ for (int i = 0; i < length; i++) {
+ valueSet.add(new BigDecimal(inPut.readUTF()));
+ }
+ }
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ byte[] valueByte = new byte[buffer.getInt()];
+ buffer.get(valueByte);
+ BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
+ agg(valueBigDecimal);
+ }
+ }
+
+ public String toString() {
+ if (computedFixedValue == null) {
+ return valueSet.size() + "";
+ }
+ return computedFixedValue + "";
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumDistinctBigDecimalAggregator aggregator = new SumDistinctBigDecimalAggregator();
+ aggregator.valueSet = new HashSet<BigDecimal>(valueSet);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator msr) {
+ BigDecimal msrValObj = getBigDecimalValue();
+ BigDecimal otherVal = msr.getBigDecimalValue();
+
+ return msrValObj.compareTo(otherVal);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof SumDistinctBigDecimalAggregator)) {
+ return false;
+ }
+ SumDistinctBigDecimalAggregator o = (SumDistinctBigDecimalAggregator) obj;
+ return getBigDecimalValue().equals(o.getBigDecimalValue());
+ }
+
+ @Override public int hashCode() {
+ return getBigDecimalValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new SumDistinctBigDecimalAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
new file mode 100644
index 0000000..0229f24
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+
+public class SumDistinctDoubleAggregator extends AbstractMeasureAggregatorBasic {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6313463368629960155L;
+
+ /**
+ * For Spark CARBON to avoid heavy object transfer it better to flatten the
+ * Aggregators. There is no aggregation expected after setting this value.
+ */
+ private Double computedFixedValue;
+
+ /**
+ *
+ */
+ private Set<Double> valueSet;
+
+ public SumDistinctDoubleAggregator() {
+ valueSet = new HashSet<Double>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ /**
+ * just need to add the unique values to agg set
+ */
+ @Override public void agg(double newVal) {
+ valueSet.add(newVal);
+ firstTime = false;
+ }
+
+ /**
+ * Distinct Aggregate function which update the Distinct set
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ valueSet.add(newVal instanceof Double ? (Double) newVal : new Double(newVal.toString()));
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSet.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ Iterator<Double> iterator = valueSet.iterator();
+ ByteBuffer buffer =
+ ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+ while (iterator.hasNext()) {
+ buffer.putDouble(iterator.next());
+ }
+ buffer.rewind();
+ return buffer.array();
+ }
+
+ private void agg(Set<Double> set2) {
+ valueSet.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ SumDistinctDoubleAggregator distinctAggregator = (SumDistinctDoubleAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(distinctAggregator.valueSet);
+ firstTime = false;
+ }
+ }
+
+ @Override public Double getDoubleValue() {
+ if (computedFixedValue == null) {
+ double result = 0;
+ for (Double aValue : valueSet) {
+ result += aValue;
+ }
+ return result;
+ }
+ return computedFixedValue;
+ }
+
+ @Override public Object getValueObject() {
+ return getDoubleValue();
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ computedFixedValue = (Double) newValue;
+ valueSet = null;
+ }
+
+ @Override public boolean isFirstTime() {
+ return firstTime;
+ }
+
+ @Override public void writeData(DataOutput dataOutput) throws IOException {
+ if (computedFixedValue != null) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+ byteBuffer.putInt(-1);
+ byteBuffer.putDouble(computedFixedValue);
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ } else {
+ int length = valueSet.size() * 8;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+ byteBuffer.putInt(length);
+ for (double val : valueSet) {
+ byteBuffer.putDouble(val);
+ }
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ }
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ int length = inPut.readInt();
+
+ if (length == -1) {
+ computedFixedValue = inPut.readDouble();
+ valueSet = null;
+ } else {
+ length = length / 8;
+ valueSet = new HashSet<Double>(length + 1, 1.0f);
+ for (int i = 0; i < length; i++) {
+ valueSet.add(inPut.readDouble());
+ }
+ }
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ agg(buffer.getDouble());
+ }
+ }
+
+ public String toString() {
+ if (computedFixedValue == null) {
+ return valueSet.size() + "";
+ }
+ return computedFixedValue + "";
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumDistinctDoubleAggregator aggregator = new SumDistinctDoubleAggregator();
+ aggregator.valueSet = new HashSet<Double>(valueSet);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator msr) {
+ double msrValObj = getDoubleValue();
+ double otherVal = msr.getDoubleValue();
+ if (msrValObj > otherVal) {
+ return 1;
+ }
+ if (msrValObj < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof SumDistinctDoubleAggregator)) {
+ return false;
+ }
+ SumDistinctDoubleAggregator o = (SumDistinctDoubleAggregator) obj;
+ return getDoubleValue().equals(o.getDoubleValue());
+ }
+
+ @Override public int hashCode() {
+ return getDoubleValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new SumDistinctDoubleAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
new file mode 100644
index 0000000..d57c34b
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+
+public class SumDistinctLongAggregator extends AbstractMeasureAggregatorBasic {
+
+ private static final long serialVersionUID = 6313463368629960155L;
+
+ /**
+ * For Spark CARBON to avoid heavy object transfer it better to flatten the
+ * Aggregators. There is no aggregation expected after setting this value.
+ */
+ private Long computedFixedValue;
+
+ /**
+ *
+ */
+ private Set<Long> valueSet;
+
+ public SumDistinctLongAggregator() {
+ valueSet = new HashSet<Long>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ /**
+ * Distinct Aggregate function which update the Distinct set
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ valueSet.add(newVal instanceof Long ? (Long) newVal : Long.valueOf(newVal.toString()));
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ valueSet.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ Iterator<Long> iterator = valueSet.iterator();
+ ByteBuffer buffer =
+ ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+ while (iterator.hasNext()) {
+ buffer.putLong(iterator.next());
+ }
+ buffer.rewind();
+ return buffer.array();
+ }
+
+ private void agg(Set<Long> set2) {
+ valueSet.addAll(set2);
+ }
+
+ /**
+ * merge the valueset so that we get the count of unique values
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ SumDistinctLongAggregator distinctAggregator = (SumDistinctLongAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(distinctAggregator.valueSet);
+ firstTime = false;
+ }
+ }
+
+ @Override public Long getLongValue() {
+ if (computedFixedValue == null) {
+ long result = 0;
+ for (Long aValue : valueSet) {
+ result += aValue;
+ }
+ return result;
+ }
+ return computedFixedValue;
+ }
+
+ @Override public Object getValueObject() {
+ return getLongValue();
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ computedFixedValue = (Long) newValue;
+ valueSet = null;
+ }
+
+ @Override public boolean isFirstTime() {
+ return firstTime;
+ }
+
+ @Override public void writeData(DataOutput dataOutput) throws IOException {
+ if (computedFixedValue != null) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+ byteBuffer.putInt(-1);
+ byteBuffer.putLong(computedFixedValue);
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ } else {
+ int length = valueSet.size() * 8;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+ byteBuffer.putInt(length);
+ for (long val : valueSet) {
+ byteBuffer.putLong(val);
+ }
+ byteBuffer.flip();
+ dataOutput.write(byteBuffer.array());
+ }
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ int length = inPut.readInt();
+
+ if (length == -1) {
+ computedFixedValue = inPut.readLong();
+ valueSet = null;
+ } else {
+ length = length / 8;
+ valueSet = new HashSet<Long>(length + 1, 1.0f);
+ for (int i = 0; i < length; i++) {
+ valueSet.add(inPut.readLong());
+ }
+ }
+
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ agg(buffer.getLong());
+ }
+ }
+
+ public String toString() {
+ if (computedFixedValue == null) {
+ return valueSet.size() + "";
+ }
+ return computedFixedValue + "";
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumDistinctLongAggregator aggregator = new SumDistinctLongAggregator();
+ aggregator.valueSet = new HashSet<Long>(valueSet);
+ return aggregator;
+ }
+
+ @Override public int compareTo(MeasureAggregator msr) {
+ long msrValObj = getLongValue();
+ long otherVal = msr.getLongValue();
+ if (msrValObj > otherVal) {
+ return 1;
+ }
+ if (msrValObj < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof SumDistinctLongAggregator)) {
+ return false;
+ }
+ SumDistinctLongAggregator o = (SumDistinctLongAggregator) obj;
+ return getLongValue().equals(o.getLongValue());
+ }
+
+ @Override public int hashCode() {
+ return getLongValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new SumDistinctLongAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
new file mode 100644
index 0000000..f423085
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.dummy;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * AbstractMeasureAggregatorDummy
+ * Used for custom Carbon Aggregator dummy
+ */
+public abstract class AbstractMeasureAggregatorDummy extends AbstractMeasureAggregatorBasic {
+ private static final long serialVersionUID = 1L;
+
+ @Override public int compareTo(MeasureAggregator o) {
+ if (equals(o)) {
+ return 0;
+ }
+ return -1;
+ }
+
+ @Override public boolean equals(Object arg0) {
+ return super.equals(arg0);
+ }
+
+ @Override public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override public byte[] getByteArray() {
+ return null;
+ }
+
+ @Override public void merge(MeasureAggregator aggregator) {
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ return null;
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ }
+
+ @Override public void merge(byte[] value) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
new file mode 100644
index 0000000..8a33fe1
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.dummy;
+
+import java.math.BigDecimal;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyBigDecimalAggregator extends AbstractMeasureAggregatorDummy {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * aggregate value
+ */
+ private BigDecimal aggVal;
+
+ @Override public void agg(Object newVal) {
+ aggVal = (BigDecimal) newVal;
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ aggVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ firstTime = false;
+ }
+ }
+
+ @Override public BigDecimal getBigDecimalValue() {
+ return aggVal;
+ }
+
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (BigDecimal) newValue;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new DummyBigDecimalAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
new file mode 100644
index 0000000..cd4fe56
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.aggregator.impl.dummy;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyDoubleAggregator extends AbstractMeasureAggregatorDummy {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * aggregate value
+ */
+ private double aggVal;
+
+ @Override public void agg(double newVal) {
+ aggVal = newVal;
+ firstTime = false;
+ }
+
+ @Override public void agg(Object newVal) {
+ aggVal = (Double) newVal;
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ aggVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+ firstTime = false;
+ }
+ }
+
+ @Override public Double getDoubleValue() {
+ return aggVal;
+ }
+
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (Double) newValue;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new DummyDoubleAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
new file mode 100644
index 0000000..4131895
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.aggregator.impl.dummy;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyLongAggregator extends AbstractMeasureAggregatorDummy {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * aggregate value
+ */
+ private long aggVal;
+
+ @Override public void agg(Object newVal) {
+ aggVal = (Long) newVal;
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ aggVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ firstTime = false;
+ }
+ }
+
+ @Override public Long getLongValue() {
+ return aggVal;
+ }
+
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (Long) newValue;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new DummyLongAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
new file mode 100644
index 0000000..27b1876
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin;
+
+public abstract class AbstractMaxAggregator extends AbstractMeasureAggregatorMaxMin {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractMaxAggregator.class.getName());
+
+ protected void internalAgg(Object value) {
+ if (value instanceof Comparable) {
+ @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value);
+ aggVal = (aggVal == null || aggVal.compareTo(newValue) < 0) ? newValue : aggVal;
+ }
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteArrayInputStream bytesInputStream = null;
+ ObjectInput in = null;
+ try {
+ bytesInputStream = new ByteArrayInputStream(value);
+ in = new ObjectInputStream(bytesInputStream);
+ Object newVal = (Comparable<Object>) in.readObject();
+ internalAgg(newVal);
+ firstTime = false;
+ } catch (Exception e) {
+ LOGGER.error(e, "Problem while merging byte array in maxAggregator: " + e.getMessage());
+ } finally {
+ CarbonUtil.closeStreams(bytesInputStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
new file mode 100644
index 0000000..d0d8af0
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxAggregator extends AbstractMaxAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -5850218739083899419L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the max aggregate value if aggregator
+ * passed as an argument will have value greater than aggVal
+ *
+ * @param aggregator MaxAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(maxAggregator.aggVal);
+ firstTime = false;
+ }
+
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MaxAggregator aggregator = new MaxAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MaxAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
new file mode 100644
index 0000000..c4149c6
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxBigDecimalAggregator extends AbstractMaxAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -5850218739083899419L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the max aggregate value if aggregator
+ * passed as an argument will have value greater than aggVal
+ *
+ * @param aggregator MaxAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(maxAggregator.aggVal);
+ firstTime = false;
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MaxAggregator aggregator = new MaxAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MaxBigDecimalAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
new file mode 100644
index 0000000..55b3be5
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxLongAggregator extends AbstractMaxAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -5850218739083899419L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the max aggregate value if aggregator
+ * passed as an argument will have value greater than aggVal
+ *
+ * @param aggregator MaxAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(maxAggregator.aggVal);
+ firstTime = false;
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MaxAggregator aggregator = new MaxAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MaxLongAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
new file mode 100644
index 0000000..77aa163
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin;
+
+public abstract class AbstractMinAggregator extends AbstractMeasureAggregatorMaxMin {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractMinAggregator.class.getName());
+
+ protected void internalAgg(Object value) {
+ if (value instanceof Comparable) {
+ @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value);
+ aggVal = (aggVal == null || aggVal.compareTo(newValue) > 0) ? newValue : aggVal;
+ }
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ ByteArrayInputStream bis = null;
+ ObjectInput objectInput = null;
+ try {
+ bis = new ByteArrayInputStream(value);
+ objectInput = new ObjectInputStream(bis);
+ Object newVal = (Comparable<Object>) objectInput.readObject();
+ internalAgg(newVal);
+ firstTime = false;
+ } catch (Exception e) {
+ LOGGER.error(e, "Problem while merging byte array in minAggregator: " + e.getMessage());
+ } finally {
+ CarbonUtil.closeStreams(bis);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
new file mode 100644
index 0000000..4bb3d73
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinAggregator extends AbstractMinAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -8077547753784906280L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the min aggregate value if aggregator
+ * passed as an argument will have value less than aggVal
+ *
+ * @param aggregator MinAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MinAggregator minAggregator = (MinAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(minAggregator.aggVal);
+ firstTime = false;
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MinAggregator aggregator = new MinAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MinAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
new file mode 100644
index 0000000..7347dc1
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinBigDecimalAggregator extends AbstractMinAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -8077547753784906280L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the min aggregate value if aggregator
+ * passed as an argument will have value less than aggVal
+ *
+ * @param aggregator MinAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MinAggregator minAggregator = (MinAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(minAggregator.aggVal);
+ firstTime = false;
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MinAggregator aggregator = new MinAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MinBigDecimalAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
new file mode 100644
index 0000000..af614c9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinLongAggregator extends AbstractMinAggregator {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = -8077547753784906280L;
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Merge the value, it will update the min aggregate value if aggregator
+ * passed as an argument will have value less than aggVal
+ *
+ * @param aggregator MinAggregator
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ MinAggregator minAggregator = (MinAggregator) aggregator;
+ if (!aggregator.isFirstTime()) {
+ agg(minAggregator.aggVal);
+ firstTime = false;
+ }
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ MinAggregator aggregator = new MinAggregator();
+ aggregator.aggVal = aggVal;
+ aggregator.firstTime = firstTime;
+ return aggregator;
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new MinLongAggregator();
+ }
+}