You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:53 UTC

[05/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported Spark 1.6 in Carbondata (#670)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
new file mode 100644
index 0000000..0629007
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public abstract class AbstractDistinctCountAggregatorObjectSet implements MeasureAggregator {
+
+  private static final long serialVersionUID = 6313463368629960186L;
+
+  protected Set<Object> valueSetForObj;
+
+  public AbstractDistinctCountAggregatorObjectSet() {
+    valueSetForObj = new HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  /**
+   * just need to add the unique values to agg set
+   */
+  @Override public void agg(double newVal) {
+  }
+
+  /**
+   * Distinct count Aggregate function which update the Distinct count
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    valueSetForObj.add(newVal);
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    return null;
+  }
+
+  @Override public Double getDoubleValue() {
+    return (double) valueSetForObj.size();
+  }
+
+  @Override public Long getLongValue() {
+    return (long) valueSetForObj.size();
+  }
+
+  @Override public BigDecimal getBigDecimalValue() {
+    return new BigDecimal(valueSetForObj.size());
+  }
+
+  @Override public Object getValueObject() {
+    return valueSetForObj.size();
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    valueSetForObj.add(newValue);
+  }
+
+  @Override public boolean isFirstTime() {
+    return false;
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+
+  }
+
+  public String toString() {
+    return valueSetForObj.size() + "";
+  }
+
+  @Override public void merge(byte[] value) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
new file mode 100644
index 0000000..1b2b33d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * * The distinct count aggregator
+ * Ex:
+ * ID NAME Sales
+ * <p>1 a 200
+ * <p>2 a 100
+ * <p>3 a 200
+ * select count(distinct sales) # would result 2
+ * select count(sales) # would result 3
+ */
+public class DistinctCountAggregator implements MeasureAggregator {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DistinctCountAggregator.class.getName());
+  /**
+   *
+   */
+  private static final long serialVersionUID = 6313463368629960186L;
+  /**
+   * For Spark CARBON to avoid heavy object transfer it better to flatten
+   * the Aggregators. There is no aggregation expected after setting this value.
+   */
+  private Double computedFixedValue;
+  /**
+   *
+   */
+  //    private Set<Double> valueSet;
+  private RoaringBitmap valueSet;
+
+  private byte[] data;
+
+  private double minValue;
+
+  public DistinctCountAggregator(Object minValue) {
+    valueSet = new RoaringBitmap();
+    if (minValue instanceof BigDecimal) {
+      this.minValue = ((BigDecimal) minValue).doubleValue();
+    } else if (minValue instanceof Long) {
+      this.minValue = ((Long) minValue).doubleValue();
+    } else {
+      this.minValue = (Double) minValue;
+    }
+  }
+
+  public DistinctCountAggregator() {
+    valueSet = new RoaringBitmap();
+  }
+
+  /**
+   * just need to add the unique values to agg set
+   */
+  @Override public void agg(double newVal) {
+    valueSet.add((int) (newVal - minValue));
+  }
+
+  /**
+   * Distinct count Aggregate function which update the Distinct count
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    if (newVal instanceof byte[]) {
+      byte[] values = (byte[]) newVal;
+      ByteBuffer buffer = ByteBuffer.wrap(values);
+      buffer.rewind();
+      while (buffer.hasRemaining()) {
+        valueSet.add(buffer.getInt());
+      }
+      return;
+    } else {
+      double value = new Double(newVal.toString());
+      agg(value);
+    }
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSet.add((int) dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    if (valueSet.getCardinality() == 0) {
+      return new byte[0];
+    }
+    IntIterator iterator = valueSet.getIntIterator();
+    ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8);
+    buffer.putDouble(minValue);
+    while (iterator.hasNext()) {
+      buffer.putInt(iterator.next());
+    }
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  private void agg(RoaringBitmap set2, double minValue) {
+    if (this.minValue == minValue) {
+      valueSet.or(set2);
+    } else {
+      if (this.minValue > minValue) {
+        IntIterator intIterator = valueSet.getIntIterator();
+        while (intIterator.hasNext()) {
+          set2.add((int) ((double) (intIterator.next() + this.minValue) - minValue));
+        }
+        this.minValue = minValue;
+        this.valueSet = set2;
+      } else {
+        IntIterator intIterator = set2.getIntIterator();
+        while (intIterator.hasNext()) {
+          valueSet.add((int) ((double) (intIterator.next() + minValue) - this.minValue));
+        }
+      }
+    }
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    DistinctCountAggregator distinctCountAggregator = (DistinctCountAggregator) aggregator;
+    readData();
+    distinctCountAggregator.readData();
+    if (distinctCountAggregator.valueSet != null) {
+      agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue);
+    }
+  }
+
+  @Override public Double getDoubleValue() {
+    if (computedFixedValue == null) {
+      readData();
+      return (double) valueSet.getCardinality();
+    }
+    return computedFixedValue;
+  }
+
+  @Override public Long getLongValue() {
+    if (computedFixedValue == null) {
+      readData();
+      return (long) valueSet.getCardinality();
+    }
+    return computedFixedValue.longValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue() {
+    if (computedFixedValue == null) {
+      readData();
+      return new BigDecimal(valueSet.getCardinality());
+    }
+    return new BigDecimal(computedFixedValue);
+  }
+
+  @Override public Object getValueObject() {
+    return valueSet.getCardinality();
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    computedFixedValue = (Double) newValue;
+    valueSet = null;
+  }
+
+  @Override public boolean isFirstTime() {
+    return false;
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+
+    if (computedFixedValue != null) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+      byteBuffer.putInt(-1);
+      byteBuffer.putDouble(computedFixedValue);
+      byteBuffer.flip();
+      output.write(byteBuffer.array());
+    } else {
+      if (valueSet != null) {
+        valueSet.serialize(output);
+      } else {
+        output.write(data);
+      }
+    }
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    valueSet = new RoaringBitmap();
+    valueSet.deserialize(inPut);
+  }
+
+  private void readData() {
+    if (data != null && (valueSet == null || valueSet.isEmpty())) {
+      ByteArrayInputStream stream = new ByteArrayInputStream(data);
+      DataInputStream outputStream = new DataInputStream(stream);
+      try {
+        readData(outputStream);
+        outputStream.close();
+        data = null;
+      } catch (IOException e) {
+        LOGGER.error(e, e.getMessage());
+      }
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    DistinctCountAggregator aggr = new DistinctCountAggregator(minValue);
+    aggr.valueSet = valueSet.clone();
+    return aggr;
+  }
+
+  @Override public int compareTo(MeasureAggregator measureAggr) {
+    double compFixedVal = getDoubleValue();
+    double otherVal = measureAggr.getDoubleValue();
+    if (compFixedVal > otherVal) {
+      return 1;
+    }
+    if (compFixedVal < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if(!(obj instanceof DistinctCountAggregator)) {
+      return false;
+    }
+    DistinctCountAggregator o = (DistinctCountAggregator) obj;
+    return getDoubleValue().equals(o.getDoubleValue());
+  }
+
+  @Override public int hashCode() {
+    return getDoubleValue().hashCode();
+  }
+
+  @Override public MeasureAggregator get() {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    DataOutputStream outputStream = new DataOutputStream(byteStream);
+    try {
+      writeData(outputStream);
+    } catch (IOException ex) {
+      LOGGER.error(ex, ex.getMessage());
+    }
+    data = byteStream.toByteArray();
+    valueSet = null;
+    return this;
+  }
+
+  public String toString() {
+    if (computedFixedValue == null) {
+      readData();
+      return valueSet.getCardinality() + "";
+    }
+    return computedFixedValue + "";
+  }
+
+  public RoaringBitmap getBitMap() {
+    return valueSet;
+  }
+
+  public double getMinValue() {
+    return minValue;
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    buffer.rewind();
+    double currentMinValue = buffer.getDouble();
+    while (buffer.hasRemaining()) {
+      agg(buffer.getInt() + currentMinValue);
+    }
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new DistinctCountAggregator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
new file mode 100644
index 0000000..3b26e53
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet {
+
+  private static final long serialVersionUID = 6313463368629960186L;
+
+  /**
+   * just need to add the unique values to agg set
+   */
+  @Override public void agg(double newVal) {
+    valueSetForObj.add(newVal);
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+    }
+  }
+
+  private void agg(Set<Object> set2) {
+    valueSetForObj.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    DistinctCountAggregatorObjectSet distinctCountAggregator =
+        (DistinctCountAggregatorObjectSet) aggregator;
+    agg(distinctCountAggregator.valueSetForObj);
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    DistinctCountAggregatorObjectSet aggregator = new DistinctCountAggregatorObjectSet();
+    aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator measureAggr) {
+    double valueSetForObjSize = getDoubleValue();
+    double otherVal = measureAggr.getDoubleValue();
+    if (valueSetForObjSize > otherVal) {
+      return 1;
+    }
+    if (valueSetForObjSize < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof DistinctCountAggregatorObjectSet)) {
+      return false;
+    }
+    DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) obj;
+    return getDoubleValue().equals(o.getDoubleValue());
+  }
+
+  @Override public int hashCode() {
+    return getDoubleValue().hashCode();
+  }
+
+  @Override public MeasureAggregator get() {
+    return this;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new DistinctCountAggregatorObjectSet();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
new file mode 100644
index 0000000..2f44bca
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountBigDecimalAggregatorObjectSet
+    extends AbstractDistinctCountAggregatorObjectSet {
+
+  private static final long serialVersionUID = 6313463368629960186L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+    }
+  }
+
+  private void agg(Set<Object> set2) {
+    valueSetForObj.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    DistinctCountBigDecimalAggregatorObjectSet distinctCountBigDecimalAggregatorObjectSet =
+        (DistinctCountBigDecimalAggregatorObjectSet) aggregator;
+    agg(distinctCountBigDecimalAggregatorObjectSet.valueSetForObj);
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    DistinctCountBigDecimalAggregatorObjectSet aggregator =
+        new DistinctCountBigDecimalAggregatorObjectSet();
+    aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator measureAggr) {
+    BigDecimal valueSetForObjSize = getBigDecimalValue();
+    BigDecimal otherVal = measureAggr.getBigDecimalValue();
+    return valueSetForObjSize.compareTo(otherVal);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof DistinctCountBigDecimalAggregatorObjectSet)) {
+      return false;
+    }
+    DistinctCountBigDecimalAggregatorObjectSet o = (DistinctCountBigDecimalAggregatorObjectSet) obj;
+    return getBigDecimalValue().equals(o.getBigDecimalValue());
+  }
+
+  @Override public int hashCode() {
+    return getBigDecimalValue().hashCode();
+  }
+
+  @Override public MeasureAggregator get() {
+    return this;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new DistinctCountBigDecimalAggregatorObjectSet();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
new file mode 100644
index 0000000..c4f7216
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctCountLongAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet {
+
+  private static final long serialVersionUID = 6313463368629960186L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+    }
+  }
+
+  private void agg(Set<Object> set2) {
+    valueSetForObj.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    DistinctCountLongAggregatorObjectSet distinctCountAggregator =
+        (DistinctCountLongAggregatorObjectSet) aggregator;
+    agg(distinctCountAggregator.valueSetForObj);
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    DistinctCountLongAggregatorObjectSet aggregator = new DistinctCountLongAggregatorObjectSet();
+    aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator measureAggr) {
+    long valueSetForObjSize = getLongValue();
+    long otherVal = measureAggr.getLongValue();
+    if (valueSetForObjSize > otherVal) {
+      return 1;
+    }
+    if (valueSetForObjSize < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof DistinctCountLongAggregatorObjectSet)) {
+      return false;
+    }
+    DistinctCountLongAggregatorObjectSet o = (DistinctCountLongAggregatorObjectSet) obj;
+    return getLongValue().equals(o.getLongValue());
+  }
+
+  @Override public int hashCode() {
+    return getLongValue().hashCode();
+  }
+
+  @Override public MeasureAggregator get() {
+    return this;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new DistinctCountLongAggregatorObjectSet();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
new file mode 100644
index 0000000..e3d4623
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DistinctStringCountAggregator implements MeasureAggregator {
+  private static final long serialVersionUID = 6313463368629960186L;
+
+  private Set<String> valueSetForStr;
+
+  public DistinctStringCountAggregator() {
+    this.valueSetForStr = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  public void agg(double newVal) {
+  }
+
+  public void agg(String newVal) {
+    this.valueSetForStr.add(newVal);
+  }
+
+  private void agg(Set<String> set2) {
+    this.valueSetForStr.addAll(set2);
+  }
+
+  public void merge(MeasureAggregator aggregator) {
+    DistinctStringCountAggregator distinctCountAggregator =
+        (DistinctStringCountAggregator) aggregator;
+    agg(distinctCountAggregator.valueSetForStr);
+  }
+
+  public Double getDoubleValue() {
+    return (double) this.valueSetForStr.size();
+  }
+
+  public Long getLongValue() {
+    return (long) this.valueSetForStr.size();
+  }
+
+  public BigDecimal getBigDecimalValue() {
+    return new BigDecimal(this.valueSetForStr.size());
+  }
+
+  public Object getValueObject() {
+    return Integer.valueOf(this.valueSetForStr.size());
+  }
+
+  public void setNewValue(Object newValue) {
+  }
+
+  public boolean isFirstTime() {
+    return false;
+  }
+
+  public void writeData(DataOutput output) throws IOException {
+    int length = this.valueSetForStr.size() * 8;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4);
+    byteBuffer.putInt(length);
+    for (String val : this.valueSetForStr) {
+      byte[] b = val.getBytes(Charset.defaultCharset());
+      byteBuffer.putInt(b.length);
+      byteBuffer.put(b);
+    }
+    byteBuffer.flip();
+    output.write(byteBuffer.array());
+  }
+
+  public void readData(DataInput inPut) throws IOException {
+    int length = inPut.readInt();
+    length /= 8;
+    this.valueSetForStr = new HashSet<String>(length + 1, 1.0F);
+    for (int i = 0; i < length; i++) {
+      byte[] b = new byte[inPut.readInt()];
+      inPut.readFully(b);
+      this.valueSetForStr.add(new String(b, Charset.defaultCharset()));
+    }
+  }
+
+  public MeasureAggregator getCopy() {
+    DistinctStringCountAggregator aggregator = new DistinctStringCountAggregator();
+    aggregator.valueSetForStr = new HashSet<String>(this.valueSetForStr);
+    return aggregator;
+  }
+
+  public int compareTo(MeasureAggregator o) {
+    double val = getDoubleValue();
+    double otherVal = o.getDoubleValue();
+    if (val > otherVal) {
+      return 1;
+    }
+    if (val < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if(!(obj instanceof DistinctStringCountAggregator)) {
+      return false;
+    }
+    DistinctStringCountAggregator o = (DistinctStringCountAggregator) obj;
+    return getDoubleValue().equals(o.getDoubleValue());
+  }
+
+  @Override public int hashCode() {
+    return getDoubleValue().hashCode();
+  }
+
+  @Override public void agg(Object newVal) {
+    this.valueSetForStr.add((String) newVal);
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+  }
+
+  @Override public byte[] getByteArray() {
+    return null;
+  }
+
+  @Override public MeasureAggregator get() {
+    return this;
+  }
+
+  public String toString() {
+    return valueSetForStr.size() + "";
+  }
+
+  @Override public void merge(byte[] value) {
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new DistinctStringCountAggregator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
new file mode 100644
index 0000000..6a59ec9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+public class SumDistinctBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 6313463368629960155L;
+
+  /**
+   * For Spark CARBON to avoid heavy object transfer it better to flatten the
+   * Aggregators. There is no aggregation expected after setting this value.
+   */
+  private BigDecimal computedFixedValue;
+
+  /**
+   *
+   */
+  private Set<BigDecimal> valueSet;
+
+  public SumDistinctBigDecimalAggregator() {
+    valueSet = new HashSet<BigDecimal>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  /**
+   * Distinct Aggregate function which update the Distinct set
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    valueSet.add(
+        newVal instanceof BigDecimal ? (BigDecimal) newVal : new BigDecimal(newVal.toString()));
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSet.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    Iterator<BigDecimal> iterator = valueSet.iterator();
+    ByteBuffer buffer =
+        ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+    while (iterator.hasNext()) {
+      byte[] bytes = DataTypeUtil.bigDecimalToByte(iterator.next());
+      buffer.putInt(bytes.length);
+      buffer.put(bytes);
+    }
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  private void agg(Set<BigDecimal> set2) {
+    valueSet.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    SumDistinctBigDecimalAggregator distinctAggregator =
+        (SumDistinctBigDecimalAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(distinctAggregator.valueSet);
+      firstTime = false;
+    }
+  }
+
+  @Override public BigDecimal getBigDecimalValue() {
+    if (computedFixedValue == null) {
+      BigDecimal result = new BigDecimal(0);
+      for (BigDecimal aValue : valueSet) {
+        result = result.add(aValue);
+      }
+      return result;
+    }
+    return computedFixedValue;
+  }
+
+  @Override public Object getValueObject() {
+    return getBigDecimalValue();
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    computedFixedValue = (BigDecimal) newValue;
+    valueSet = null;
+  }
+
+  @Override public boolean isFirstTime() {
+    return firstTime;
+  }
+
+  @Override public void writeData(DataOutput dataOutput) throws IOException {
+    if (computedFixedValue != null) {
+      byte[] bytes = DataTypeUtil.bigDecimalToByte(computedFixedValue);
+      ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + bytes.length);
+      byteBuffer.putInt(-1);
+      byteBuffer.putInt(bytes.length);
+      byteBuffer.put(bytes);
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    } else {
+      int length = valueSet.size() * 8 + valueSet.size() * 4;
+      ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+      byteBuffer.putInt(length);
+      for (BigDecimal val : valueSet) {
+        byte[] bytes =
+            val.toString().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+        byteBuffer.putInt(-1);
+        byteBuffer.putInt(bytes.length);
+        byteBuffer.put(bytes);
+      }
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    }
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    int length = inPut.readInt();
+
+    if (length == -1) {
+      computedFixedValue = new BigDecimal(inPut.readUTF());
+      valueSet = null;
+    } else {
+      length = length / 8;
+      valueSet = new HashSet<BigDecimal>(length + 1, 1.0f);
+      for (int i = 0; i < length; i++) {
+        valueSet.add(new BigDecimal(inPut.readUTF()));
+      }
+    }
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    buffer.rewind();
+    while (buffer.hasRemaining()) {
+      byte[] valueByte = new byte[buffer.getInt()];
+      buffer.get(valueByte);
+      BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
+      agg(valueBigDecimal);
+    }
+  }
+
+  public String toString() {
+    if (computedFixedValue == null) {
+      return valueSet.size() + "";
+    }
+    return computedFixedValue + "";
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumDistinctBigDecimalAggregator aggregator = new SumDistinctBigDecimalAggregator();
+    aggregator.valueSet = new HashSet<BigDecimal>(valueSet);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator msr) {
+    BigDecimal msrValObj = getBigDecimalValue();
+    BigDecimal otherVal = msr.getBigDecimalValue();
+
+    return msrValObj.compareTo(otherVal);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof SumDistinctBigDecimalAggregator)) {
+      return false;
+    }
+    SumDistinctBigDecimalAggregator o = (SumDistinctBigDecimalAggregator) obj;
+    return getBigDecimalValue().equals(o.getBigDecimalValue());
+  }
+
+  @Override public int hashCode() {
+    return getBigDecimalValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new SumDistinctBigDecimalAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
new file mode 100644
index 0000000..0229f24
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+
+public class SumDistinctDoubleAggregator extends AbstractMeasureAggregatorBasic {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 6313463368629960155L;
+
+  /**
+   * For Spark CARBON to avoid heavy object transfer it better to flatten the
+   * Aggregators. There is no aggregation expected after setting this value.
+   */
+  private Double computedFixedValue;
+
+  /**
+   *
+   */
+  private Set<Double> valueSet;
+
+  public SumDistinctDoubleAggregator() {
+    valueSet = new HashSet<Double>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  /**
+   * just need to add the unique values to agg set
+   */
+  @Override public void agg(double newVal) {
+    valueSet.add(newVal);
+    firstTime = false;
+  }
+
+  /**
+   * Distinct Aggregate function which update the Distinct set
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    valueSet.add(newVal instanceof Double ? (Double) newVal : new Double(newVal.toString()));
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSet.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    Iterator<Double> iterator = valueSet.iterator();
+    ByteBuffer buffer =
+        ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+    while (iterator.hasNext()) {
+      buffer.putDouble(iterator.next());
+    }
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  private void agg(Set<Double> set2) {
+    valueSet.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    SumDistinctDoubleAggregator distinctAggregator = (SumDistinctDoubleAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(distinctAggregator.valueSet);
+      firstTime = false;
+    }
+  }
+
+  @Override public Double getDoubleValue() {
+    if (computedFixedValue == null) {
+      double result = 0;
+      for (Double aValue : valueSet) {
+        result += aValue;
+      }
+      return result;
+    }
+    return computedFixedValue;
+  }
+
+  @Override public Object getValueObject() {
+    return getDoubleValue();
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    computedFixedValue = (Double) newValue;
+    valueSet = null;
+  }
+
+  @Override public boolean isFirstTime() {
+    return firstTime;
+  }
+
+  @Override public void writeData(DataOutput dataOutput) throws IOException {
+    if (computedFixedValue != null) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+      byteBuffer.putInt(-1);
+      byteBuffer.putDouble(computedFixedValue);
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    } else {
+      int length = valueSet.size() * 8;
+      ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+      byteBuffer.putInt(length);
+      for (double val : valueSet) {
+        byteBuffer.putDouble(val);
+      }
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    }
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    int length = inPut.readInt();
+
+    if (length == -1) {
+      computedFixedValue = inPut.readDouble();
+      valueSet = null;
+    } else {
+      length = length / 8;
+      valueSet = new HashSet<Double>(length + 1, 1.0f);
+      for (int i = 0; i < length; i++) {
+        valueSet.add(inPut.readDouble());
+      }
+    }
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    buffer.rewind();
+    while (buffer.hasRemaining()) {
+      agg(buffer.getDouble());
+    }
+  }
+
+  public String toString() {
+    if (computedFixedValue == null) {
+      return valueSet.size() + "";
+    }
+    return computedFixedValue + "";
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumDistinctDoubleAggregator aggregator = new SumDistinctDoubleAggregator();
+    aggregator.valueSet = new HashSet<Double>(valueSet);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator msr) {
+    double msrValObj = getDoubleValue();
+    double otherVal = msr.getDoubleValue();
+    if (msrValObj > otherVal) {
+      return 1;
+    }
+    if (msrValObj < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof SumDistinctDoubleAggregator)) {
+      return false;
+    }
+    SumDistinctDoubleAggregator o = (SumDistinctDoubleAggregator) obj;
+    return getDoubleValue().equals(o.getDoubleValue());
+  }
+
+  @Override public int hashCode() {
+    return getDoubleValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new SumDistinctDoubleAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
new file mode 100644
index 0000000..d57c34b
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.distinct;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * The sum distinct aggregator
+ * Ex:
+ * ID NAME Sales
+ * 1 a 200
+ * 2 a 100
+ * 3 a 200
+ * select sum(distinct sales) # would result 300
+ */
+
+public class SumDistinctLongAggregator extends AbstractMeasureAggregatorBasic {
+
+  private static final long serialVersionUID = 6313463368629960155L;
+
+  /**
+   * For Spark CARBON to avoid heavy object transfer it better to flatten the
+   * Aggregators. There is no aggregation expected after setting this value.
+   */
+  private Long computedFixedValue;
+
+  /**
+   *
+   */
+  private Set<Long> valueSet;
+
+  public SumDistinctLongAggregator() {
+    valueSet = new HashSet<Long>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  /**
+   * Distinct Aggregate function which update the Distinct set
+   *
+   * @param newVal new value
+   */
+  @Override public void agg(Object newVal) {
+    valueSet.add(newVal instanceof Long ? (Long) newVal : Long.valueOf(newVal.toString()));
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      valueSet.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Below method will be used to get the value byte array
+   */
+  @Override public byte[] getByteArray() {
+    Iterator<Long> iterator = valueSet.iterator();
+    ByteBuffer buffer =
+        ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+    while (iterator.hasNext()) {
+      buffer.putLong(iterator.next());
+    }
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  private void agg(Set<Long> set2) {
+    valueSet.addAll(set2);
+  }
+
+  /**
+   * merge the valueset so that we get the count of unique values
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    SumDistinctLongAggregator distinctAggregator = (SumDistinctLongAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(distinctAggregator.valueSet);
+      firstTime = false;
+    }
+  }
+
+  @Override public Long getLongValue() {
+    if (computedFixedValue == null) {
+      long result = 0;
+      for (Long aValue : valueSet) {
+        result += aValue;
+      }
+      return result;
+    }
+    return computedFixedValue;
+  }
+
+  @Override public Object getValueObject() {
+    return getLongValue();
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    computedFixedValue = (Long) newValue;
+    valueSet = null;
+  }
+
+  @Override public boolean isFirstTime() {
+    return firstTime;
+  }
+
+  @Override public void writeData(DataOutput dataOutput) throws IOException {
+    if (computedFixedValue != null) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
+      byteBuffer.putInt(-1);
+      byteBuffer.putLong(computedFixedValue);
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    } else {
+      int length = valueSet.size() * 8;
+      ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1);
+      byteBuffer.putInt(length);
+      for (long val : valueSet) {
+        byteBuffer.putLong(val);
+      }
+      byteBuffer.flip();
+      dataOutput.write(byteBuffer.array());
+    }
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+    int length = inPut.readInt();
+
+    if (length == -1) {
+      computedFixedValue = inPut.readLong();
+      valueSet = null;
+    } else {
+      length = length / 8;
+      valueSet = new HashSet<Long>(length + 1, 1.0f);
+      for (int i = 0; i < length; i++) {
+        valueSet.add(inPut.readLong());
+      }
+    }
+
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    buffer.rewind();
+    while (buffer.hasRemaining()) {
+      agg(buffer.getLong());
+    }
+  }
+
+  public String toString() {
+    if (computedFixedValue == null) {
+      return valueSet.size() + "";
+    }
+    return computedFixedValue + "";
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    SumDistinctLongAggregator aggregator = new SumDistinctLongAggregator();
+    aggregator.valueSet = new HashSet<Long>(valueSet);
+    return aggregator;
+  }
+
+  @Override public int compareTo(MeasureAggregator msr) {
+    long msrValObj = getLongValue();
+    long otherVal = msr.getLongValue();
+    if (msrValObj > otherVal) {
+      return 1;
+    }
+    if (msrValObj < otherVal) {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof SumDistinctLongAggregator)) {
+      return false;
+    }
+    SumDistinctLongAggregator o = (SumDistinctLongAggregator) obj;
+    return getLongValue().equals(o.getLongValue());
+  }
+
+  @Override public int hashCode() {
+    return getLongValue().hashCode();
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new SumDistinctLongAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
new file mode 100644
index 0000000..f423085
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.dummy;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+/**
+ * AbstractMeasureAggregatorDummy
+ * Used for custom Carbon Aggregator dummy
+ */
+public abstract class AbstractMeasureAggregatorDummy extends AbstractMeasureAggregatorBasic {
+  private static final long serialVersionUID = 1L;
+
+  @Override public int compareTo(MeasureAggregator o) {
+    if (equals(o)) {
+      return 0;
+    }
+    return -1;
+  }
+
+  @Override public boolean equals(Object arg0) {
+    return super.equals(arg0);
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override public byte[] getByteArray() {
+    return null;
+  }
+
+  @Override public void merge(MeasureAggregator aggregator) {
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    return null;
+  }
+
+  @Override public void writeData(DataOutput output) throws IOException {
+  }
+
+  @Override public void readData(DataInput inPut) throws IOException {
+  }
+
+  @Override public void merge(byte[] value) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
new file mode 100644
index 0000000..8a33fe1
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.dummy;
+
+import java.math.BigDecimal;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyBigDecimalAggregator extends AbstractMeasureAggregatorDummy {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * aggregate value
+   */
+  private BigDecimal aggVal;
+
+  @Override public void agg(Object newVal) {
+    aggVal = (BigDecimal) newVal;
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      aggVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+      firstTime = false;
+    }
+  }
+
+  @Override public BigDecimal getBigDecimalValue() {
+    return aggVal;
+  }
+
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (BigDecimal) newValue;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    // TODO Auto-generated method stub
+    return new DummyBigDecimalAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
new file mode 100644
index 0000000..cd4fe56
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.aggregator.impl.dummy;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyDoubleAggregator extends AbstractMeasureAggregatorDummy {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * aggregate value
+   */
+  private double aggVal;
+
+  @Override public void agg(double newVal) {
+    aggVal = newVal;
+    firstTime = false;
+  }
+
+  @Override public void agg(Object newVal) {
+    aggVal = (Double) newVal;
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      aggVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      firstTime = false;
+    }
+  }
+
+  @Override public Double getDoubleValue() {
+    return aggVal;
+  }
+
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (Double) newValue;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new DummyDoubleAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
new file mode 100644
index 0000000..4131895
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.aggregator.impl.dummy;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+public class DummyLongAggregator extends AbstractMeasureAggregatorDummy {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * aggregate value
+   */
+  private long aggVal;
+
+  @Override public void agg(Object newVal) {
+    aggVal = (Long) newVal;
+    firstTime = false;
+  }
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      aggVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+      firstTime = false;
+    }
+  }
+
+  @Override public Long getLongValue() {
+    return aggVal;
+  }
+
+  @Override public Object getValueObject() {
+    return aggVal;
+  }
+
+  @Override public void setNewValue(Object newValue) {
+    aggVal = (Long) newValue;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new DummyLongAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
new file mode 100644
index 0000000..27b1876
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin;
+
+public abstract class AbstractMaxAggregator extends AbstractMeasureAggregatorMaxMin {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractMaxAggregator.class.getName());
+
+  protected void internalAgg(Object value) {
+    if (value instanceof Comparable) {
+      @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value);
+      aggVal = (aggVal == null || aggVal.compareTo(newValue) < 0) ? newValue : aggVal;
+    }
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteArrayInputStream bytesInputStream = null;
+    ObjectInput in = null;
+    try {
+      bytesInputStream = new ByteArrayInputStream(value);
+      in = new ObjectInputStream(bytesInputStream);
+      Object newVal = (Comparable<Object>) in.readObject();
+      internalAgg(newVal);
+      firstTime = false;
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while merging byte array in maxAggregator: " + e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(bytesInputStream);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
new file mode 100644
index 0000000..d0d8af0
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxAggregator extends AbstractMaxAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -5850218739083899419L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the max aggregate value if aggregator
+   * passed as an argument will have value greater than aggVal
+   *
+   * @param aggregator MaxAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(maxAggregator.aggVal);
+      firstTime = false;
+    }
+
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MaxAggregator aggregator = new MaxAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MaxAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
new file mode 100644
index 0000000..c4149c6
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxBigDecimalAggregator extends AbstractMaxAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -5850218739083899419L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the max aggregate value if aggregator
+   * passed as an argument will have value greater than aggVal
+   *
+   * @param aggregator MaxAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(maxAggregator.aggVal);
+      firstTime = false;
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MaxAggregator aggregator = new MaxAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MaxBigDecimalAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
new file mode 100644
index 0000000..55b3be5
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.max;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description :
+ * It will return max of values
+ */
+public class MaxLongAggregator extends AbstractMaxAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -5850218739083899419L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the max aggregate value if aggregator
+   * passed as an argument will have value greater than aggVal
+   *
+   * @param aggregator MaxAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MaxAggregator maxAggregator = (MaxAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(maxAggregator.aggVal);
+      firstTime = false;
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MaxAggregator aggregator = new MaxAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MaxLongAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
new file mode 100644
index 0000000..77aa163
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin;
+
+public abstract class AbstractMinAggregator extends AbstractMeasureAggregatorMaxMin {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractMinAggregator.class.getName());
+
+  protected void internalAgg(Object value) {
+    if (value instanceof Comparable) {
+      @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value);
+      aggVal = (aggVal == null || aggVal.compareTo(newValue) > 0) ? newValue : aggVal;
+    }
+  }
+
+  @Override public void merge(byte[] value) {
+    if (0 == value.length) {
+      return;
+    }
+    ByteArrayInputStream bis = null;
+    ObjectInput objectInput = null;
+    try {
+      bis = new ByteArrayInputStream(value);
+      objectInput = new ObjectInputStream(bis);
+      Object newVal = (Comparable<Object>) objectInput.readObject();
+      internalAgg(newVal);
+      firstTime = false;
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while merging byte array in minAggregator: " + e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(bis);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
new file mode 100644
index 0000000..4bb3d73
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinAggregator extends AbstractMinAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -8077547753784906280L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the min aggregate value if aggregator
+   * passed as an argument will have value less than aggVal
+   *
+   * @param aggregator MinAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MinAggregator minAggregator = (MinAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(minAggregator.aggVal);
+      firstTime = false;
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MinAggregator aggregator = new MinAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MinAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
new file mode 100644
index 0000000..7347dc1
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinBigDecimalAggregator extends AbstractMinAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -8077547753784906280L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the min aggregate value if aggregator
+   * passed as an argument will have value less than aggVal
+   *
+   * @param aggregator MinAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MinAggregator minAggregator = (MinAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(minAggregator.aggVal);
+      firstTime = false;
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MinAggregator aggregator = new MinAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MinBigDecimalAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
new file mode 100644
index 0000000..af614c9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.min;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.query.aggregator.MeasureAggregator;
+
+/**
+ * Class Description : It will return min of values
+ */
+public class MinLongAggregator extends AbstractMinAggregator {
+
+  /**
+   * serialVersionUID
+   */
+  private static final long serialVersionUID = -8077547753784906280L;
+
+  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index));
+      firstTime = false;
+    }
+  }
+
+  /**
+   * Merge the value, it will update the min aggregate value if aggregator
+   * passed as an argument will have value less than aggVal
+   *
+   * @param aggregator MinAggregator
+   */
+  @Override public void merge(MeasureAggregator aggregator) {
+    MinAggregator minAggregator = (MinAggregator) aggregator;
+    if (!aggregator.isFirstTime()) {
+      agg(minAggregator.aggVal);
+      firstTime = false;
+    }
+  }
+
+  @Override public MeasureAggregator getCopy() {
+    MinAggregator aggregator = new MinAggregator();
+    aggregator.aggVal = aggVal;
+    aggregator.firstTime = firstTime;
+    return aggregator;
+  }
+
+  @Override public MeasureAggregator getNew() {
+    return new MinLongAggregator();
+  }
+}