You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/13 07:35:33 UTC

[1/2] incubator-carbondata git commit: rebased code. fixed review comment.

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 00c479463 -> 05497d0d1


rebased code.
fixed review comment.

BigDecimal compression


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/63d66264
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/63d66264
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/63d66264

Branch: refs/heads/master
Commit: 63d66264cb60338845c87f7f627019bb843844aa
Parents: 00c4794
Author: ashok.blend <as...@gmail.com>
Authored: Sat Dec 3 13:05:38 2016 -0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 13 15:34:38 2016 +0800

----------------------------------------------------------------------
 .../core/compression/BigDecimalCompressor.java  |  76 +++++
 .../core/compression/BigIntCompressor.java      |  26 ++
 .../core/compression/ValueCompressor.java       |   9 +
 .../store/compression/WriterCompressModel.java  |  11 +
 .../compression/type/UnCompressBigDecimal.java  | 112 +++++++
 .../type/UnCompressBigDecimalByte.java          | 134 +++++++++
 .../store/dataholder/CarbonReadDataHolder.java  |   3 +
 .../store/dataholder/CarbonWriteDataHolder.java |  50 ++++
 ...ractHeavyCompressedDoubleArrayDataStore.java |  10 +-
 .../core/util/BigDecimalCompressionFinder.java  |  96 ++++++
 .../carbondata/core/util/CompressionFinder.java | 145 +++++++++
 .../core/util/ValueCompressionUtil.java         | 291 +++++++------------
 ...mpressedMeasureChunkFileBasedReaderTest.java |  98 +++----
 .../core/util/ValueCompressionUtilTest.java     |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  59 +++-
 15 files changed, 863 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
new file mode 100644
index 0000000..85c9927
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.compression;
+
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+/**
+ * Bigdecimal data type compressor
+ *
+ */
+public class BigDecimalCompressor extends BigIntCompressor {
+
+  private boolean readLeft = true;
+
+  @Override
+  public Object getCompressedValues(CompressionFinder compressionFinder,
+      CarbonWriteDataHolder dataHolder, Object maxValue, int decimal) {
+    BigDecimalCompressionFinder bigdCompressionFinder =
+        (BigDecimalCompressionFinder) compressionFinder;
+    Long[] maxValues = (Long[]) maxValue;
+    Object leftCompressedValue = getCompressedValues(
+        bigdCompressionFinder.getLeftCompType(), dataHolder,
+        bigdCompressionFinder.getLeftChangedDataType(), maxValues[0], 0);
+    readLeft = false;
+    Object rightCompressedValue = getCompressedValues(
+        bigdCompressionFinder.getRightCompType(), dataHolder,
+        bigdCompressionFinder.getRightChangedDataType(), maxValues[1], 0);
+    return new Object[] { leftCompressedValue, rightCompressedValue };
+  }
+
+  @Override
+  protected Object compressMaxMin(DataType changedDataType,
+      CarbonWriteDataHolder dataHolder, Object max) {
+    long maxValue = (long) max;
+    long[][] writableBigDValues = dataHolder.getWritableBigDecimalValues();
+    long[] value = null;
+    if (readLeft) {
+      value = writableBigDValues[0];
+    } else {
+      value = writableBigDValues[1];
+    }
+    return compressMaxMin(changedDataType, maxValue, value);
+  }
+
+  @Override
+  protected Object compressNone(DataType changedDataType,
+      CarbonWriteDataHolder dataHolder) {
+    long[][] writableBigDValues = dataHolder.getWritableBigDecimalValues();
+    long[] value = null;
+    if (readLeft) {
+      value = writableBigDValues[0];
+    } else {
+      value = writableBigDValues[1];
+    }
+    return compressNone(changedDataType, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
index 7b9e52f..315b28f 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
@@ -44,6 +44,19 @@ public class BigIntCompressor extends ValueCompressor {
       Object max) {
     long maxValue = (long) max;
     long[] value = dataHolder.getWritableLongValues();
+    return compressMaxMin(changedDataType, maxValue, value);
+  }
+
+  /**
+   * 1. It gets delta value i.e difference of maximum value and actual value
+   * 2. Convert the delta value computed above to changedDatatype
+   * @param changedDataType
+   * @param maxValue
+   * @param value
+   * @return
+   */
+  protected Object compressMaxMin(DataType changedDataType, long maxValue,
+      long[] value) {
     int i = 0;
     switch (changedDataType) {
       case DATA_BYTE:
@@ -80,6 +93,19 @@ public class BigIntCompressor extends ValueCompressor {
   @Override
   protected Object compressNone(DataType changedDataType, CarbonWriteDataHolder dataHolder) {
     long[] value = dataHolder.getWritableLongValues();
+    return compressNone(changedDataType, value);
+  }
+
+  /**
+   * It convert the value to changed datatype.
+   * Changed datatype is computed based list of values it has.
+   * for instance if value is 2,10,12,45
+   * these value can be easily fit in byte and hence below method convert to byte and store it.
+   * @param changedDataType
+   * @param value
+   * @return
+   */
+  protected Object compressNone(DataType changedDataType, long[] value) {
     int i = 0;
     switch (changedDataType) {
       case DATA_BYTE:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
index 370c8d5..9e7862c 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
@@ -19,6 +19,7 @@
 package org.apache.carbondata.core.compression;
 
 import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -27,6 +28,14 @@ import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
  */
 public abstract class ValueCompressor {
 
+  public Object getCompressedValues(CompressionFinder compressionFinder,
+      CarbonWriteDataHolder dataHolder, Object maxValue, int decimal) {
+    return getCompressedValues(compressionFinder.getCompType(),
+        dataHolder,
+        compressionFinder.getChangedDataType(),
+        maxValue, decimal);
+  }
+
   /**
    *
    * @param compType

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/WriterCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/WriterCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/WriterCompressModel.java
index ac21c8e..a9c2666 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/WriterCompressModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/WriterCompressModel.java
@@ -19,6 +19,7 @@
 
 package org.apache.carbondata.core.datastorage.store.compression;
 
+import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 
 public class WriterCompressModel {
@@ -68,6 +69,8 @@ public class WriterCompressModel {
    */
   private ValueCompressonHolder.UnCompressValue[] unCompressValues;
 
+  private CompressionFinder[] compressionFinders;
+
   /**
    * @return the compType
    */
@@ -218,4 +221,12 @@ public class WriterCompressModel {
     this.uniqueValue = uniqueValue;
   }
 
+
+  public void setCompressionFinders(CompressionFinder[] compressionFinders) {
+    this.compressionFinders = compressionFinders;
+  }
+
+  public CompressionFinder[] getCompressionFinder() {
+    return this.compressionFinders;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
new file mode 100644
index 0000000..bf2c11b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimal.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.store.compression.type;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+/**
+ * Big decimal compression/uncompression
+ */
+public class UnCompressBigDecimal<T> implements UnCompressValue<T> {
+
+  private BigDecimalCompressionFinder compressionFinder;
+
+  /**
+   * leftPart before decimal
+   */
+  private UnCompressValue leftPart;
+
+  /**
+   * rightPart after decimal
+   */
+  private UnCompressValue rightPart;
+
+  public UnCompressBigDecimal(BigDecimalCompressionFinder compressionFinder,
+      UnCompressValue leftPart, UnCompressValue rightPart) {
+    this.compressionFinder = compressionFinder;
+    this.leftPart = leftPart;
+    this.rightPart = rightPart;
+  }
+
+  @Override
+  public void setValue(T value) {
+    Object[] values = (Object[]) value;
+    leftPart.setValue(values[0]);
+    rightPart.setValue(values[1]);
+  }
+
+  @Override
+  public void setValueInBytes(byte[] value) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public UnCompressValue<T> getNew() {
+    UnCompressValue leftUnCompressClone = leftPart.getNew();
+    UnCompressValue rightUnCompressClone = rightPart.getNew();
+    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone,
+        rightUnCompressClone);
+  }
+
+  @Override
+  public UnCompressValue compress() {
+    UnCompressBigDecimal byt = new UnCompressBigDecimal<>(compressionFinder,
+        leftPart.compress(), rightPart.compress());
+    return byt;
+  }
+
+  @Override
+  public UnCompressValue uncompress(DataType dataType) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public byte[] getBackArrayData() {
+    byte[] leftdata = leftPart.getBackArrayData();
+    byte[] rightdata = rightPart.getBackArrayData();
+    ByteBuffer byteBuffer = ByteBuffer
+        .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length
+            + rightdata.length);
+    byteBuffer.putInt(leftdata.length);
+    byteBuffer.put(leftdata);
+    byteBuffer.put(rightdata);
+    byteBuffer.flip();
+    return byteBuffer.array();
+  }
+
+  @Override
+  public UnCompressValue getCompressorObject() {
+    return new UnCompressBigDecimalByte<>(compressionFinder,
+        leftPart.getCompressorObject(), rightPart.getCompressorObject());
+  }
+
+  @Override
+  public CarbonReadDataHolder getValues(int decimal, Object maxValue) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
new file mode 100644
index 0000000..97e0d38
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/type/UnCompressBigDecimalByte.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.store.compression.type;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+/**
+ * Big decimal compression/uncompression
+ */
+public class UnCompressBigDecimalByte<T> implements UnCompressValue<T> {
+
+  private BigDecimalCompressionFinder compressionFinder;
+
+  private UnCompressValue leftPart;
+
+  private UnCompressValue rightPart;
+
+  public UnCompressBigDecimalByte(
+      BigDecimalCompressionFinder compressionFinder, UnCompressValue leftPart,
+      UnCompressValue rightPart) {
+    this.compressionFinder = compressionFinder;
+    this.leftPart = leftPart;
+    this.rightPart = rightPart;
+  }
+
+  @Override
+  public void setValue(T value) {
+    byte[] values = (byte[]) value;
+    ByteBuffer buffer = ByteBuffer.wrap(values);
+    buffer.rewind();
+    int leftPartLen = buffer.getInt();
+    int rightPartLen = values.length - leftPartLen
+        - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    byte[] leftValue = new byte[leftPartLen];
+    byte[] rightValue = new byte[rightPartLen];
+    buffer.get(leftValue);
+    buffer.get(rightValue);
+    leftPart.setValue(leftValue);
+    rightPart.setValue(rightValue);
+  }
+
+  @Override
+  public void setValueInBytes(byte[] value) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public UnCompressValue<T> getNew() {
+    UnCompressValue leftUnCompressClone = leftPart.getNew();
+    UnCompressValue rightUnCompressClone = rightPart.getNew();
+    return new UnCompressBigDecimal(compressionFinder, leftUnCompressClone,
+        rightUnCompressClone);
+  }
+
+  @Override
+  public UnCompressValue compress() {
+    UnCompressBigDecimal byt = new UnCompressBigDecimal<>(compressionFinder,
+        leftPart.compress(), rightPart.compress());
+    return byt;
+  }
+
+  @Override
+  public UnCompressValue uncompress(DataType dataType) {
+    ValueCompressonHolder.UnCompressValue left = leftPart
+        .uncompress(compressionFinder.getLeftChangedDataType());
+    ValueCompressonHolder.UnCompressValue right = rightPart
+        .uncompress(compressionFinder.getRightChangedDataType());
+    return new UnCompressBigDecimalByte<>(compressionFinder, left, right);
+  }
+
+  @Override
+  public byte[] getBackArrayData() {
+    return null;
+  }
+
+  @Override
+  public UnCompressValue getCompressorObject() {
+    return new UnCompressBigDecimal<>(compressionFinder,
+        leftPart.getCompressorObject(), rightPart.getCompressorObject());
+  }
+
+  @Override
+  public CarbonReadDataHolder getValues(int decimal, Object maxValue) {
+    Long[] maxValues = (Long[]) maxValue;
+    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
+    CarbonReadDataHolder leftDataHolder = leftPart.getValues(decimal,
+        maxValues[0]);
+    long[] leftVals = leftDataHolder.getReadableLongValue();
+    int size = leftVals.length;
+    long[] rightVals = new long[size];
+    if (decimal > 0) {
+      CarbonReadDataHolder rightDataHolder = rightPart.getValues(decimal,
+          maxValues[1]);
+      rightVals = rightDataHolder.getReadableLongValue();
+    }
+    BigDecimal[] values = new BigDecimal[size];
+    for (int i = 0; i < size; i++) {
+      String decimalPart = Double.toString(rightVals[i]/Math.pow(10, decimal));
+      String bigdStr = Long.toString(leftVals[i])
+          + CarbonCommonConstants.POINT
+          + decimalPart.substring(decimalPart.indexOf(".")+1, decimalPart.length());
+      BigDecimal bigdVal = new BigDecimal(bigdStr);
+      values[i] = bigdVal;
+    }
+    dataHolder.setReadableBigDecimalValues(values);
+    return dataHolder;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
index 1cfc7a2..7bf3dbe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonReadDataHolder.java
@@ -72,6 +72,9 @@ public class CarbonReadDataHolder {
     return this.intValues[index];
   }
 
+  public long[] getReadableLongValue() {
+    return this.longValues;
+  }
   public long getReadableLongValueByIndex(int index) {
     return this.longValues[index];
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
index 08f7786..c87b266 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
@@ -31,6 +31,15 @@ public class CarbonWriteDataHolder {
   private long[] longValues;
 
   /**
+   * bigDecimal left part
+   */
+  private long[] bigDecimalLeftValues;
+
+  /**
+   * bigDecimal right part
+   */
+  private long[] bigDecimalRightValues;
+  /**
    * byteValues
    */
   private byte[][] byteValues;
@@ -120,6 +129,14 @@ public class CarbonWriteDataHolder {
     longValues = new long[size];
   }
 
+  public void initialiseBigDecimalValues(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+    bigDecimalLeftValues = new long[size];
+    bigDecimalRightValues = new long[size];
+  }
+
   /**
    * set double value by index
    *
@@ -143,6 +160,17 @@ public class CarbonWriteDataHolder {
   }
 
   /**
+   * set bigdecimal value by index
+   *
+   * @param index
+   * @param value
+   */
+  public void setWritableBigDecimalValueByIndex(int index, long[] value) {
+    bigDecimalLeftValues[index] = value[0];
+    bigDecimalRightValues[index] = value[1];
+    size++;
+  }
+  /**
    * set byte array value by index
    *
    * @param index
@@ -227,4 +255,26 @@ public class CarbonWriteDataHolder {
     }
     return longValues;
   }
+
+  /**
+   * Get Writable bigdecimal Values
+   *
+   * @return
+   */
+  public long[][] getWritableBigDecimalValues() {
+    long[][] bigDecimalValues = new long[2][];
+    if (size < bigDecimalLeftValues.length) {
+      long[] temp = new long[size];
+      System.arraycopy(bigDecimalLeftValues, 0, temp, 0, size);
+      bigDecimalLeftValues = temp;
+    }
+    if (size < bigDecimalRightValues.length) {
+      long[] temp = new long[size];
+      System.arraycopy(bigDecimalRightValues, 0, temp, 0, size);
+      bigDecimalRightValues = temp;
+    }
+    bigDecimalValues[0]= bigDecimalLeftValues;
+    bigDecimalValues[1] = bigDecimalRightValues;
+    return bigDecimalValues;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
index 9b6ce9f..3adc888 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
@@ -62,13 +62,11 @@ public abstract class AbstractHeavyCompressedDoubleArrayDataStore
   @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
     for (int i = 0; i < compressionModel.getUnCompressValues().length; i++) {
       values[i] = compressionModel.getUnCompressValues()[i].getNew();
-      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-
+      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE) {
         values[i].setValue(
-            ValueCompressionUtil.getValueCompressor(compressionModel.getActualDataType()[i])
-                .getCompressedValues(compressionModel.getCompType()[i], dataHolder[i],
-                    compressionModel.getChangedDataType()[i], compressionModel.getMaxValue()[i],
+            ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinder()[i])
+                .getCompressedValues(compressionModel.getCompressionFinder()[i], dataHolder[i],
+                    compressionModel.getMaxValue()[i],
                     compressionModel.getMantissa()[i]));
       } else {
         values[i].setValue(dataHolder[i].getWritableByteArrayValues());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/util/BigDecimalCompressionFinder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BigDecimalCompressionFinder.java b/core/src/main/java/org/apache/carbondata/core/util/BigDecimalCompressionFinder.java
new file mode 100644
index 0000000..9a7ae13
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BigDecimalCompressionFinder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class BigDecimalCompressionFinder extends CompressionFinder {
+
+  /**
+   * non decimal part compression type
+   */
+  private COMPRESSION_TYPE leftCompType;
+
+  /**
+   * decimal part compression type
+   */
+  private COMPRESSION_TYPE rightCompType;
+
+  /**
+   * non decimal actual data type
+   */
+  private DataType leftActualDataType;
+
+  /**
+   * decimal actual data type
+   */
+  private DataType rightActualDataType;
+
+  /**
+   * non decimal changed data type
+   */
+  private DataType leftChangedDataType;
+
+  /**
+   * decimal changed data type
+   */
+  private DataType rightChangedDataType;
+
+  public BigDecimalCompressionFinder(COMPRESSION_TYPE compType,
+      DataType actualDataType, DataType changedDataType, char measureStoreType) {
+    super(compType, actualDataType, changedDataType, measureStoreType);
+  }
+
+  public BigDecimalCompressionFinder(COMPRESSION_TYPE[] compType,
+      DataType[] actualDataType, DataType[] changedDataType, char measureStoreType) {
+    super(null, null, null, measureStoreType);
+    this.leftCompType = compType[0];
+    this.rightCompType = compType[1];
+    this.leftActualDataType = actualDataType[0];
+    this.rightActualDataType = actualDataType[1];
+    this.leftChangedDataType = changedDataType[0];
+    this.rightChangedDataType = changedDataType[1];
+  }
+
+  public COMPRESSION_TYPE getLeftCompType() {
+    return leftCompType;
+  }
+
+  public COMPRESSION_TYPE getRightCompType() {
+    return rightCompType;
+  }
+
+  public DataType getLeftActualDataType() {
+    return leftActualDataType;
+  }
+
+  public DataType getRightActualDataType() {
+    return rightActualDataType;
+  }
+
+  public DataType getLeftChangedDataType() {
+    return leftChangedDataType;
+  }
+
+  public DataType getRightChangedDataType() {
+    return rightChangedDataType;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
new file mode 100644
index 0000000..b642a20
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
@@ -0,0 +1,145 @@
+package org.apache.carbondata.core.util;
+
+import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+
+/**
+ * through the size of data type,priority and compression type, select the
+ * best compression type
+ */
+public class CompressionFinder implements Comparable<CompressionFinder> {
+  private COMPRESSION_TYPE compType;
+
+  private DataType actualDataType;
+
+  private DataType changedDataType;
+  /**
+   * the size of changed data
+   */
+  private int size;
+
+  private PRIORITY priority;
+
+  private char measureStoreType;
+
+  /**
+   * CompressionFinder constructor.
+   *
+   * @param compType
+   * @param actualDataType
+   * @param changedDataType
+   */
+  CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
+      DataType changedDataType, char measureStoreType) {
+    super();
+    this.compType = compType;
+    this.actualDataType = actualDataType;
+    this.changedDataType = changedDataType;
+    this.measureStoreType = measureStoreType;
+  }
+
+  /**
+   * CompressionFinder overloaded constructor.
+   *
+   * @param compType
+   * @param actualDataType
+   * @param changedDataType
+   * @param priority
+   */
+
+  CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType, DataType changedDataType,
+      PRIORITY priority, char measureStoreType) {
+    super();
+    this.actualDataType = actualDataType;
+    this.changedDataType = changedDataType;
+    this.size = ValueCompressionUtil.getSize(changedDataType);
+    this.priority = priority;
+    this.compType = compType;
+    this.measureStoreType = measureStoreType;
+  }
+
+  @Override public boolean equals(Object obj) {
+    boolean equals = false;
+    if (obj instanceof CompressionFinder) {
+      CompressionFinder cf = (CompressionFinder) obj;
+
+      if (this.size == cf.size && this.priority == cf.priority) {
+        equals = true;
+      }
+
+    }
+    return equals;
+  }
+
+  @Override public int hashCode() {
+    final int code = 31;
+    int result = 1;
+
+    result = code * result + this.size;
+    result = code * result + ((priority == null) ? 0 : priority.hashCode());
+    return result;
+  }
+
+  @Override public int compareTo(CompressionFinder o) {
+    int returnVal = 0;
+    // the big size have high priority
+    if (this.equals(o)) {
+      returnVal = 0;
+    } else if (this.size == o.size) {
+      // the compression type priority
+      if (priority.priority > o.priority.priority) {
+        returnVal = 1;
+      } else if (priority.priority < o.priority.priority) {
+        returnVal = -1;
+      }
+
+    } else if (this.size > o.size) {
+      returnVal = 1;
+    } else {
+      returnVal = -1;
+    }
+    return returnVal;
+  }
+
+  /**
+   * Compression type priority.
+   * ACTUAL is the highest priority and DIFFNONDECIMAL is the lowest
+   * priority
+   */
+  enum PRIORITY {
+    ACTUAL(0),
+    DIFFSIZE(1),
+    MAXNONDECIMAL(2),
+    DIFFNONDECIMAL(3);
+    private int priority;
+
+    private PRIORITY(int priority) {
+      this.priority = priority;
+    }
+  }
+
+  public COMPRESSION_TYPE getCompType() {
+    return compType;
+  }
+
+  public DataType getActualDataType() {
+    return actualDataType;
+  }
+
+  public DataType getChangedDataType() {
+    return changedDataType;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public PRIORITY getPriority() {
+    return priority;
+  }
+
+  public char getMeasureStoreType() {
+    return measureStoreType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
index d01b30a..6210c30 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -23,12 +23,14 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.carbondata.core.compression.BigDecimalCompressor;
 import org.apache.carbondata.core.compression.BigIntCompressor;
 import org.apache.carbondata.core.compression.DoubleCompressor;
 import org.apache.carbondata.core.compression.ValueCompressor;
 import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
 import org.apache.carbondata.core.datastorage.store.compression.ReaderCompressModel;
 import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressByteArray;
 import org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressMaxMinByte;
@@ -55,6 +57,7 @@ import org.apache.carbondata.core.datastorage.store.compression.none.UnCompressN
 import org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneInt;
 import org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneLong;
 import org.apache.carbondata.core.datastorage.store.compression.none.UnCompressNoneShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressBigDecimal;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 
 public final class ValueCompressionUtil {
@@ -138,19 +141,21 @@ public final class ValueCompressionUtil {
     // ''b' for decimal, 'l' for long, 'n' for double
     switch (measureStoreType) {
       case 'b':
-        return new CompressionFinder(COMPRESSION_TYPE.BIGDECIMAL, DataType.DATA_BYTE,
-            DataType.DATA_BYTE);
+        return getBigDecimalCompressorFinder(maxValue, minValue, 0, dataTypeSelected,
+            measureStoreType);
       case 'l':
-        return getLongCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected);
+        return getLongCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
+            measureStoreType);
       case 'n':
-        return getDoubleCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected);
+        return getDoubleCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
+            measureStoreType);
       default:
         throw new IllegalArgumentException("unsupported measure type");
     }
   }
 
   private static CompressionFinder getDoubleCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected) {
+      int mantissa, byte dataTypeSelected, char measureStoreType) {
     //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
     //but we can't use -1 to getDatatype, we should use -10000000.
     double absMaxValue = Math.abs((double) maxValue) >= Math.abs((double) minValue) ?
@@ -165,13 +170,13 @@ public final class ValueCompressionUtil {
       int deltaSize = getSize(deltaDataType);
       if (adaptiveSize > deltaSize) {
         return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_DOUBLE,
-            deltaDataType);
+            deltaDataType, measureStoreType);
       } else if (adaptiveSize < deltaSize) {
         return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
-            deltaDataType);
+            deltaDataType, measureStoreType);
       } else {
         return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
-            adaptiveDataType);
+            adaptiveDataType, measureStoreType);
       }
     } else {
       // double
@@ -183,21 +188,47 @@ public final class ValueCompressionUtil {
 
       CompressionFinder[] finders = new CompressionFinder[] {
           new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, adaptiveDataType, adaptiveDataType,
-              CompressionFinder.PRIORITY.ACTUAL),
+              CompressionFinder.PRIORITY.ACTUAL, measureStoreType),
           new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, adaptiveDataType, deltaDataType,
-              CompressionFinder.PRIORITY.DIFFSIZE),
+              CompressionFinder.PRIORITY.DIFFSIZE, measureStoreType),
           new CompressionFinder(COMPRESSION_TYPE.BIGINT, adaptiveDataType, maxNonDecDataType,
-              CompressionFinder.PRIORITY.MAXNONDECIMAL),
+              CompressionFinder.PRIORITY.MAXNONDECIMAL, measureStoreType),
           new CompressionFinder(COMPRESSION_TYPE.DELTA_NON_DECIMAL, adaptiveDataType,
-              diffNonDecDataType, CompressionFinder.PRIORITY.DIFFNONDECIMAL) };
+              diffNonDecDataType, CompressionFinder.PRIORITY.DIFFNONDECIMAL, measureStoreType) };
       // sort the compressionFinder.The top have the highest priority
       Arrays.sort(finders);
       return finders[0];
     }
   }
 
+  private static CompressionFinder getBigDecimalCompressorFinder(
+      Object maxValue, Object minValue, int mantissa,
+      byte dataTypeSelected, char measureStoreType) {
+    Long[] maxValues = (Long[])maxValue;
+    Long[] minValues = (Long[])minValue;
+    CompressionFinder leftCompressionFinder = getLongCompressorFinder(maxValues[0], minValues[0],
+        mantissa, dataTypeSelected, measureStoreType);
+    CompressionFinder rightCompressionFinder = getLongCompressorFinder(maxValues[1], minValues[1],
+        mantissa, dataTypeSelected, measureStoreType);
+    COMPRESSION_TYPE[] compressionTypes = new COMPRESSION_TYPE[2];
+    DataType[] actualDataTypes = new DataType[2];
+    DataType[] changedDataTypes = new DataType[2];
+    compressionTypes[0] = leftCompressionFinder.getCompType();
+    compressionTypes[1] = rightCompressionFinder.getCompType();
+
+    actualDataTypes[0] = leftCompressionFinder.getActualDataType();
+    actualDataTypes[1] = rightCompressionFinder.getActualDataType();
+
+    changedDataTypes[0] = leftCompressionFinder.getChangedDataType();
+    changedDataTypes[1] = rightCompressionFinder.getChangedDataType();
+
+    CompressionFinder bigdCompressionFinder = new BigDecimalCompressionFinder(
+        compressionTypes, actualDataTypes, changedDataTypes, measureStoreType);
+    return bigdCompressionFinder;
+  }
+
   private static CompressionFinder getLongCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected) {
+      int mantissa, byte dataTypeSelected, char measureStoreType) {
     DataType adaptiveDataType = getDataType((long) maxValue, mantissa, dataTypeSelected);
     int adaptiveSize = getSize(adaptiveDataType);
     DataType deltaDataType = getDataType((long) maxValue - (long) minValue, mantissa,
@@ -205,13 +236,13 @@ public final class ValueCompressionUtil {
     int deltaSize = getSize(deltaDataType);
     if (adaptiveSize > deltaSize) {
       return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_BIGINT,
-          deltaDataType);
+          deltaDataType, measureStoreType);
     } else if (adaptiveSize < deltaSize) {
       return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
-          deltaDataType);
+          deltaDataType, measureStoreType);
     } else {
       return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
-          adaptiveDataType);
+          adaptiveDataType, measureStoreType);
     }
   }
 
@@ -240,49 +271,62 @@ public final class ValueCompressionUtil {
 
   /**
    * It returns Compressor for given datatype
-   * @param actualDataType
+   * @param msrType
    * @return compressor based on actualdatatype
    */
-  public static ValueCompressor getValueCompressor(DataType actualDataType) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return new BigIntCompressor();
-      default:
-        return new DoubleCompressor();
+  public static ValueCompressor getValueCompressor(CompressionFinder compressorFinder) {
+    switch(compressorFinder.getMeasureStoreType()) {
+      case 'b': return new BigDecimalCompressor();
+      case 'l': return new BigIntCompressor();
+      default : return new DoubleCompressor();
     }
   }
 
-  private static ValueCompressonHolder.UnCompressValue[] getUncompressedValues(
-      COMPRESSION_TYPE[] compType, DataType[] actualDataType, DataType[] changedDataType) {
-
-    ValueCompressonHolder.UnCompressValue[] compressValue =
-        new ValueCompressonHolder.UnCompressValue[changedDataType.length];
-    for (int i = 0; i < changedDataType.length; i++) {
-      switch (compType[i]) {
-        case ADAPTIVE:
-          compressValue[i] = getUnCompressNone(changedDataType[i], actualDataType[i]);
-          break;
-
-        case DELTA_DOUBLE:
-          compressValue[i] = getUnCompressDecimalMaxMin(changedDataType[i], actualDataType[i]);
-          break;
-
-        case BIGINT:
-          compressValue[i] = getUnCompressNonDecimal(changedDataType[i]);
-          break;
-
-        case BIGDECIMAL:
-          compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BIG_DECIMAL);
-          break;
+  /**
+   * get uncompressed object
+   * @param compressionFinders : Compression types for measures
+   * @return
+   */
+  private static UnCompressValue[] getUncompressedValues(
+      CompressionFinder[] compressionFinders) {
+    UnCompressValue[] unCompressedValues = new UnCompressValue[compressionFinders.length];
+    for (int i=0; i< compressionFinders.length; i++) {
+      unCompressedValues[i] = getUncompressedValue(compressionFinders[i]);
+    }
+    return unCompressedValues;
+  }
+  /**
+   *
+   * @param compressionFinder bigdecimal compression finder
+   * @return Uncompressed measure object
+   */
+  private static UnCompressValue getUncompressedValue(
+      BigDecimalCompressionFinder compressionFinder) {
+    UnCompressValue leftPart = getUncompressedValue(compressionFinder.getLeftCompType(),
+        compressionFinder.getLeftActualDataType(), compressionFinder.getLeftChangedDataType());
+    UnCompressValue rightPart = getUncompressedValue(compressionFinder.getRightCompType(),
+        compressionFinder.getRightActualDataType(), compressionFinder.getRightChangedDataType());
+    return new UnCompressBigDecimal<>(compressionFinder, leftPart, rightPart);
+  }
 
-        default:
-          compressValue[i] = getUnCompressNonDecimalMaxMin(changedDataType[i]);
-      }
+  /**
+   *
+   * @param compressionFinder for measure other then bigdecimal
+   * @return
+   */
+  private static UnCompressValue getUncompressedValue(
+      CompressionFinder compressionFinder) {
+    switch(compressionFinder.getMeasureStoreType()) {
+      case 'b':
+        return getUncompressedValue(
+              (BigDecimalCompressionFinder) compressionFinder);
+      default:
+        return getUncompressedValue(compressionFinder.getCompType(),
+            compressionFinder.getActualDataType(), compressionFinder.getChangedDataType());
     }
-    return compressValue;
   }
 
-  private static ValueCompressonHolder.UnCompressValue getUncompressedValues(
+  private static ValueCompressonHolder.UnCompressValue getUncompressedValue(
       COMPRESSION_TYPE compType, DataType actualDataType, DataType changedDataType) {
     switch (compType) {
       case ADAPTIVE:
@@ -685,14 +729,17 @@ public final class ValueCompressionUtil {
     DataType[] actualType = new DataType[measureCount];
     DataType[] changedType = new DataType[measureCount];
     COMPRESSION_TYPE[] compType = new COMPRESSION_TYPE[measureCount];
+    CompressionFinder[] compressionFinders = new CompressionFinder[measureCount];
     for (int i = 0; i < measureCount; i++) {
       CompressionFinder compresssionFinder =
           ValueCompressionUtil.getCompressionFinder(maxValue[i],
               minValue[i], mantissa[i], type[i], dataTypeSelected[i]);
-      actualType[i] = compresssionFinder.actualDataType;
-      changedType[i] = compresssionFinder.changedDataType;
-      compType[i] = compresssionFinder.compType;
+      compressionFinders[i] = compresssionFinder;
+      actualType[i] = compresssionFinder.getActualDataType();
+      changedType[i] = compresssionFinder.getChangedDataType();
+      compType[i] = compresssionFinder.getCompType();
     }
+    compressionModel.setCompressionFinders(compressionFinders);
     compressionModel.setMaxValue(maxValue);
     compressionModel.setMantissa(mantissa);
     compressionModel.setChangedDataType(changedType);
@@ -703,8 +750,7 @@ public final class ValueCompressionUtil {
     compressionModel.setType(type);
     compressionModel.setDataTypeSelected(dataTypeSelected);
     ValueCompressonHolder.UnCompressValue[] values = ValueCompressionUtil
-        .getUncompressedValues(compressionModel.getCompType(), compressionModel.getActualDataType(),
-            compressionModel.getChangedDataType());
+        .getUncompressedValues(compressionFinders);
     compressionModel.setUnCompressValues(values);
     return compressionModel;
   }
@@ -718,11 +764,8 @@ public final class ValueCompressionUtil {
         getCompressionFinder(meta.getMaxValue(), meta.getMinValue(), meta.getMantissa(),
             meta.getType(), meta.getDataTypeSelected());
     compressModel.setUnCompressValues(
-        ValueCompressionUtil.getUncompressedValues(
-            compressFinder.compType,
-            compressFinder.actualDataType,
-            compressFinder.changedDataType));
-    compressModel.setChangedDataType(compressFinder.changedDataType);
+          ValueCompressionUtil.getUncompressedValue(compressFinder));
+    compressModel.setChangedDataType(compressFinder.getChangedDataType());
     compressModel.setValueEncoderMeta(meta);
     return compressModel;
   }
@@ -858,133 +901,5 @@ public final class ValueCompressionUtil {
     DATA_DOUBLE();
     private DataType() {
     }
-
-  }
-
-  /**
-   * through the size of data type,priority and compression type, select the
-   * best compression type
-   */
-  private static class CompressionFinder implements Comparable<CompressionFinder> {
-
-    private COMPRESSION_TYPE compType;
-
-    private DataType actualDataType;
-
-    private DataType changedDataType;
-    /**
-     * the size of changed data
-     */
-    private int size;
-
-    private PRIORITY priority;
-
-    /**
-     * CompressionFinder constructor.
-     *
-     * @param compType
-     * @param actualDataType
-     * @param changedDataType
-     */
-    CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
-        DataType changedDataType) {
-      super();
-      this.compType = compType;
-      this.actualDataType = actualDataType;
-      this.changedDataType = changedDataType;
-    }
-
-    /**
-     * CompressionFinder overloaded constructor.
-     *
-     * @param compType
-     * @param actualDataType
-     * @param changedDataType
-     * @param priority
-     */
-
-    CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType, DataType changedDataType,
-        PRIORITY priority) {
-      super();
-      this.actualDataType = actualDataType;
-      this.changedDataType = changedDataType;
-      this.size = getSize(changedDataType);
-      this.priority = priority;
-      this.compType = compType;
-    }
-
-    @Override public boolean equals(Object obj) {
-      boolean equals = false;
-      if (obj instanceof CompressionFinder) {
-        CompressionFinder cf = (CompressionFinder) obj;
-
-        if (this.size == cf.size && this.priority == cf.priority) {
-          equals = true;
-        }
-
-      }
-      return equals;
-    }
-
-    @Override public int hashCode() {
-      final int code = 31;
-      int result = 1;
-
-      result = code * result + this.size;
-      result = code * result + ((priority == null) ? 0 : priority.hashCode());
-      return result;
-    }
-
-    @Override public int compareTo(CompressionFinder o) {
-      int returnVal = 0;
-      // the big size have high priority
-      if (this.equals(o)) {
-        returnVal = 0;
-      } else if (this.size == o.size) {
-        // the compression type priority
-        if (priority.priority > o.priority.priority) {
-          returnVal = 1;
-        } else if (priority.priority < o.priority.priority) {
-          returnVal = -1;
-        }
-
-      } else if (this.size > o.size) {
-        returnVal = 1;
-      } else {
-        returnVal = -1;
-      }
-      return returnVal;
-    }
-
-    /**
-     * Compression type priority.
-     * ACTUAL is the highest priority and DIFFNONDECIMAL is the lowest
-     * priority
-     */
-    enum PRIORITY {
-      /**
-       *
-       */
-      ACTUAL(0), /**
-       *
-       */
-      DIFFSIZE(1), /**
-       *
-       */
-      MAXNONDECIMAL(2), /**
-       *
-       */
-      DIFFNONDECIMAL(3);
-
-      /**
-       * priority.
-       */
-      private int priority;
-
-      private PRIORITY(int priority) {
-        this.priority = priority;
-      }
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
index 36c8ec3..c2a5d5d 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
@@ -2,6 +2,7 @@ package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure;
 
 import static junit.framework.TestCase.assertEquals;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -13,10 +14,10 @@ import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.Compr
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
 import org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.apache.carbondata.core.datastorage.store.compression.decimal.UnCompressByteArray;
-import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastorage.util.StoreFactory;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.junit.BeforeClass;
@@ -25,28 +26,27 @@ import org.junit.Test;
 public class CompressedMeasureChunkFileBasedReaderTest {
 
   static CompressedMeasureChunkFileBasedReaderV1 compressedMeasureChunkFileBasedReader;
+  static CarbonWriteDataHolder[] dataHolder = new CarbonWriteDataHolder[1];
 
+  static WriterCompressModel writerCompressModel;
   @BeforeClass public static void setup() {
     List<DataChunk> dataChunkList = new ArrayList<>();
     dataChunkList.add(new DataChunk());
 
-    WriterCompressModel writerCompressModel = new WriterCompressModel();
+    writerCompressModel = new WriterCompressModel();
+    Object maxValue[] = new Object[]{new Long[]{8L, 0L}};
+    Object minValue[] = new Object[]{new Long[]{1L,0L}};
+    byte[] dataTypeSelected = new byte[1];
+    char[] aggType = new char[]{'b'};
+    MeasureMetaDataModel measureMDMdl =
+                new MeasureMetaDataModel(minValue, maxValue, new int[]{1}, maxValue.length, null,
+                    aggType, dataTypeSelected);
+    writerCompressModel = ValueCompressionUtil.getWriterCompressModel(measureMDMdl);
+    
 
-    ValueCompressonHolder.UnCompressValue unCompressValue[] =
-        { new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BYTE_ARRAY) };
-    byte valueInByte[] = { 1, 5, 4, 8, 7 };
-    unCompressValue[0].setValueInBytes(valueInByte);
-    ValueCompressionUtil.DataType dataType[] = { ValueCompressionUtil.DataType.DATA_BYTE };
-
-    writerCompressModel.setUnCompressValues(unCompressValue);
-    writerCompressModel.setChangedDataType(dataType);
-    int decimal[] = { 5, 8, 2 };
-    writerCompressModel.setMantissa(decimal);
-    Object maxValue[] = { 8 };
-    writerCompressModel.setMaxValue(maxValue);
     ValueEncoderMeta meta = new ValueEncoderMeta();
-    meta.setMaxValue(8.0);
-    meta.setMinValue(1.0);
+    meta.setMaxValue(new Long[]{8L,0L});
+    meta.setMinValue(new Long[]{1L,0L});
     meta.setMantissa(1);
     meta.setType('b');
     List<ValueEncoderMeta> valueEncoderMetaList = new ArrayList<>();
@@ -61,63 +61,45 @@ public class CompressedMeasureChunkFileBasedReaderTest {
   @Test public void readMeasureChunkTest() {
     FileHolder fileHolder = new MockUp<FileHolder>() {
       @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-        return mockedValue;
+        dataHolder[0] = new CarbonWriteDataHolder();
+        dataHolder[0].initialiseBigDecimalValues(1);
+        dataHolder[0].setWritableBigDecimalValueByIndex(0, new long[]{2L,1L});
+        byte[][] writableMeasureDataArray =
+            StoreFactory.createDataStore(writerCompressModel).getWritableMeasureDataArray(dataHolder)
+                .clone();
+        return writableMeasureDataArray[0];
       }
     }.getMockInstance();
 
-    new MockUp<UnCompressByteArray>() {
-      @Mock public CarbonReadDataHolder getValues(int decimal, Object maxValueObject) {
-        List<byte[]> valsList = new ArrayList<byte[]>();
-        byte mockedValue[] = { 3, 7, 9 };
-        valsList.add(mockedValue);
-        CarbonReadDataHolder holder = new CarbonReadDataHolder();
-        byte[][] value = new byte[valsList.size()][];
-        valsList.toArray(value);
-        holder.setReadableByteValues(value);
-        return holder;
-      }
-    };
-
     MeasureColumnDataChunk measureColumnDataChunks =
         compressedMeasureChunkFileBasedReader.readMeasureChunk(fileHolder, 0);
 
-    byte expectedValue[] = { 3, 7, 9 };
-    for (int i = 0; i < 3; i++) {
-      assertEquals(expectedValue[i],
-          measureColumnDataChunks.getMeasureDataHolder().getReadableByteArrayValueByIndex(0)[i]);
-    }
+    BigDecimal bigD = new BigDecimal("2.1");
+    assertEquals(bigD,
+        measureColumnDataChunks.getMeasureDataHolder().getReadableBigDecimalValueByIndex(0));
+      
   }
 
   @Test public void readMeasureChunksTest() {
     FileHolder fileHolder = new MockUp<FileHolder>() {
       @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-        return mockedValue;
+        dataHolder[0] = new CarbonWriteDataHolder();
+        dataHolder[0].initialiseBigDecimalValues(1);
+        dataHolder[0].setWritableBigDecimalValueByIndex(0, new long[]{2L,1L});
+        byte[][] writableMeasureDataArray =
+            StoreFactory.createDataStore(writerCompressModel).getWritableMeasureDataArray(dataHolder)
+                .clone();
+        return writableMeasureDataArray[0];
       }
     }.getMockInstance();
 
-    new MockUp<UnCompressByteArray>() {
-      @Mock public CarbonReadDataHolder getValues(int decimal, Object maxValueObject) {
-        List<byte[]> valsList = new ArrayList<byte[]>();
-        byte mockedValue[] = { 3, 7, 9 };
-        valsList.add(mockedValue);
-        CarbonReadDataHolder holder = new CarbonReadDataHolder();
-        byte[][] value = new byte[valsList.size()][];
-        valsList.toArray(value);
-        holder.setReadableByteValues(value);
-        return holder;
-      }
-    };
-
     int[][] blockIndexes = {{0,0}};
     MeasureColumnDataChunk measureColumnDataChunks[] =
         compressedMeasureChunkFileBasedReader.readMeasureChunks(fileHolder, blockIndexes);
 
-    byte expectedValue[] = { 3, 7, 9 };
-    for (int i = 0; i < 3; i++) {
-      assertEquals(expectedValue[i],
-          measureColumnDataChunks[0].getMeasureDataHolder().getReadableByteArrayValueByIndex(0)[i]);
-    }
+    BigDecimal bigD = new BigDecimal("2.1");
+    assertEquals(bigD,
+        measureColumnDataChunks[0].getMeasureDataHolder().getReadableBigDecimalValueByIndex(0));
+
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
index a1bb6f0..739670c 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
@@ -590,7 +590,7 @@ public class ValueCompressionUtilTest {
     Object[] maxValues = { 10l, 20l, 30l };
     Object[] minValues = { 1l, 2l, 3l };
     int[] decimalLength = { 0, 0, 0 };
-    Object[] uniqueValues = { 5, 3, 2l };
+    Object[] uniqueValues = { 5, new Long[]{2l,4l}, 2l };
     char[] types = { 'l', 'l', 'l' };
     byte[] dataTypeSelected = { 1, 2, 4 };
     MeasureMetaDataModel measureMetaDataModel =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63d66264/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 0398cd7..cde19bd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -493,7 +493,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
         max[i] = -Double.MAX_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(0.0);
+        //max[i] = new BigDecimal(0.0);
+        Long[] bigdMinVal = new Long[2];
+        bigdMinVal[0] = Long.MIN_VALUE;
+        bigdMinVal[1] = Long.MIN_VALUE;
+        max[i] = bigdMinVal;
       } else {
         max[i] = 0.0;
       }
@@ -506,8 +510,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         min[i] = Double.MAX_VALUE;
         uniqueValue[i] = Double.MIN_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        min[i] = new BigDecimal(Double.MAX_VALUE);
+        Long[] bigdMaxVal = new Long[2];
+        bigdMaxVal[0] = Long.MAX_VALUE;
+        bigdMaxVal[1] = Long.MAX_VALUE;
+        //min[i] = new BigDecimal(Double.MAX_VALUE);
+        min[i] = bigdMaxVal;
         uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+        Long[] bigdUniqueVal = new Long[2];
+        bigdUniqueVal[0] = Long.MIN_VALUE;
+        bigdUniqueVal[1] = Long.MIN_VALUE;
+        uniqueValue[i] = bigdUniqueVal;
       } else {
         min[i] = 0.0;
         uniqueValue[i] = 0.0;
@@ -588,11 +600,23 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             b = (byte[]) row[customMeasureIndex[i]];
           }
         }
+        BigDecimal value = DataTypeUtil.byteToBigDecimal(b);
+        String[] bigdVals = value.toPlainString().split("\\.");
+        long[] bigDvalue = new long[2];
+        if (bigdVals.length == 2) {
+          bigDvalue[0] = Long.parseLong(bigdVals[0]);
+          BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT+bigdVals[1]);
+          bigDvalue[1] = (long)(bd.doubleValue()*Math.pow(10, value.scale()));
+          //bigDvalue[1] = Long.parseLong(bigdVals[1]);
+        } else {
+          bigDvalue[0] = Long.parseLong(bigdVals[0]);
+        }
         byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
         byteBuffer.putInt(b.length);
         byteBuffer.put(b);
         byteBuffer.flip();
         b = byteBuffer.array();
+        dataHolder[customMeasureIndex[i]].setWritableBigDecimalValueByIndex(count, bigDvalue);
         dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
       }
       calculateMaxMin(max, min, decimal, customMeasureIndex, row);
@@ -1207,8 +1231,28 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             buff = (byte[]) row[count];
           }
           BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
-          BigDecimal minVal = (BigDecimal) min[count];
-          min[count] = minVal.min(value);
+          decimal[count] = value.scale();
+          String[] bigdVals = value.toPlainString().split("\\.");
+          Long[] maxVal = (Long[])max[count];
+          Long[] minVal = (Long[])min[count];
+          long maxLeftVal = (long)maxVal[0];
+          long maxRightVal = (long)maxVal[1];
+          long minLeftVal = (long) minVal[0];
+          long minRightVal = (long)minVal[1];
+          if (bigdVals.length == 2) {
+            long leftPart = Long.parseLong(bigdVals[0]);
+            BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT+bigdVals[1]);
+            long rightPart = (long)(bd.doubleValue()*Math.pow(10, value.scale()));
+            //Long.parseLong(bigdVals[1]);
+            maxVal[0] = (maxLeftVal > leftPart ? maxLeftVal : leftPart);
+            maxVal[1] = (maxRightVal > rightPart ? maxRightVal : rightPart);
+            minVal[0] = (minLeftVal < leftPart ? minLeftVal : leftPart);
+            minVal[1] = (minRightVal < rightPart ? minRightVal : rightPart);
+          } else {
+            long leftPart = Long.parseLong(bigdVals[0]);
+            maxVal[0] = (maxLeftVal > leftPart ? maxLeftVal : leftPart);
+            minVal[0] = (minLeftVal < leftPart ? minLeftVal : leftPart);
+          }
         }
       }
     }
@@ -1226,8 +1270,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
         uniqueValue[i] = (long) minValue[i] - 1;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        BigDecimal val = (BigDecimal) minValue[i];
-        uniqueValue[i] = (val.subtract(new BigDecimal(1.0)));
+        Long[] bigdMinVal = (Long[])minValue[i];
+        Long[] bigdUniqVal = (Long[])uniqueValue[i];
+        bigdUniqVal[0] = bigdMinVal[0] -1;
+        bigdUniqVal[1] = bigdMinVal[1] -1;
       } else {
         uniqueValue[i] = (double) minValue[i] - 1;
       }
@@ -1372,6 +1418,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     for (int i = 0; i < customMeasureIndex.length; i++) {
       dataHolder[customMeasureIndex[i]] = new CarbonWriteDataHolder();
       dataHolder[customMeasureIndex[i]].initialiseByteArrayValues(size);
+      dataHolder[customMeasureIndex[i]].initialiseBigDecimalValues(size);
     }
     return dataHolder;
   }


[2/2] incubator-carbondata git commit: [CARBONDATA-431]bigdecimal compression. This closes #388

Posted by ja...@apache.org.
[CARBONDATA-431]bigdecimal compression. This closes #388


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/05497d0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/05497d0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/05497d0d

Branch: refs/heads/master
Commit: 05497d0d1b9bfc61cb6ebdd488f07aa772b7d0b9
Parents: 00c4794 63d6626
Author: jackylk <ja...@huawei.com>
Authored: Tue Dec 13 15:35:21 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 13 15:35:21 2016 +0800

----------------------------------------------------------------------
 .../core/compression/BigDecimalCompressor.java  |  76 +++++
 .../core/compression/BigIntCompressor.java      |  26 ++
 .../core/compression/ValueCompressor.java       |   9 +
 .../store/compression/WriterCompressModel.java  |  11 +
 .../compression/type/UnCompressBigDecimal.java  | 112 +++++++
 .../type/UnCompressBigDecimalByte.java          | 134 +++++++++
 .../store/dataholder/CarbonReadDataHolder.java  |   3 +
 .../store/dataholder/CarbonWriteDataHolder.java |  50 ++++
 ...ractHeavyCompressedDoubleArrayDataStore.java |  10 +-
 .../core/util/BigDecimalCompressionFinder.java  |  96 ++++++
 .../carbondata/core/util/CompressionFinder.java | 145 +++++++++
 .../core/util/ValueCompressionUtil.java         | 291 +++++++------------
 ...mpressedMeasureChunkFileBasedReaderTest.java |  98 +++----
 .../core/util/ValueCompressionUtilTest.java     |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  59 +++-
 15 files changed, 863 insertions(+), 259 deletions(-)
----------------------------------------------------------------------