You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/05 13:39:40 UTC

[11/13] incubator-carbondata git commit: refactor write step

refactor write step


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8cca0afc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8cca0afc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8cca0afc

Branch: refs/heads/12-dev
Commit: 8cca0afc5db16557146dfaa33e14c2823d895966
Parents: bd044c2
Author: jackylk <ja...@huawei.com>
Authored: Thu Mar 30 11:21:21 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Apr 5 14:36:47 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   2 +-
 .../core/datastore/NodeMeasureDataStore.java    |  32 --
 ...ractHeavyCompressedDoubleArrayDataStore.java |  89 -----
 ...yCompressedDoubleArrayDataInMemoryStore.java |  28 --
 .../HeavyCompressedDoubleArrayDataStore.java    |  57 ++++
 .../core/util/CarbonMetadataUtil.java           |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../carbondata/core/util/DataTypeUtil.java      |   5 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |   6 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   2 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   2 +-
 .../sortdata/IntermediateFileMerger.java        |   2 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |   2 +-
 .../sortdata/SortTempFileChunkHolder.java       |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 328 +++++++++++--------
 .../processing/store/StoreFactory.java          |  33 --
 .../store/writer/CarbonFactDataWriter.java      |   4 +-
 .../writer/v1/CarbonFactDataWriterImplV1.java   |  13 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  14 +-
 19 files changed, 287 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 789c321..b82d53c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -631,7 +631,7 @@ public final class CarbonCommonConstants {
   /**
    * DOUBLE_VALUE_MEASURE
    */
-  public static final char SUM_COUNT_VALUE_MEASURE = 'n';
+  public static final char DOUBLE_MEASURE = 'n';
   /**
    * BYTE_VALUE_MEASURE
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
deleted file mode 100644
index 2f54847..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-
-public interface NodeMeasureDataStore {
-  /**
-   * This method will be used to get the writable key array.
-   * writable measure data array will hold below information:
-   * <size of measure data array><measure data array>
-   * total length will be 4 bytes for size + measure data array length
-   *
-   * @return writable array (compressed or normal)
-   */
-  byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolderArray);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index b274b21..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.NodeMeasureDataStore;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public abstract class AbstractHeavyCompressedDoubleArrayDataStore
-    implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
-{
-
-  private LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName());
-
-  /**
-   * values.
-   */
-  protected ValueCompressionHolder[] values;
-
-  /**
-   * compressionModel.
-   */
-  protected WriterCompressModel compressionModel;
-
-  /**
-   * type
-   */
-  private char[] type;
-
-  /**
-   * AbstractHeavyCompressedDoubleArrayDataStore constructor.
-   *
-   * @param compressionModel
-   */
-  public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel compressionModel) {
-    this.compressionModel = compressionModel;
-    if (null != compressionModel) {
-      this.type = compressionModel.getType();
-      values =
-          new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
-    }
-  }
-
-  // this method first invokes encoding routine to encode the data chunk,
-  // followed by invoking compression routine for preparing the data chunk for writing.
-  @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
-    byte[][] returnValue = new byte[values.length][];
-    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
-      values[i] = compressionModel.getValueCompressionHolder()[i];
-      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        // first perform encoding of the data chunk
-        values[i].setValue(
-            ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
-                .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
-                    compressionModel.getMaxValue()[i],
-                    compressionModel.getMantissa()[i]));
-      } else {
-        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
-      }
-      values[i].compress();
-      returnValue[i] = values[i].getCompressedData();
-    }
-
-    return returnValue;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
deleted file mode 100644
index a484b8f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-
-public class HeavyCompressedDoubleArrayDataInMemoryStore
-    extends AbstractHeavyCompressedDoubleArrayDataStore {
-
-  public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel compressionModel) {
-    super(compressionModel);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
new file mode 100644
index 0000000..d3d67fd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.impl.data.compressed;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public class HeavyCompressedDoubleArrayDataStore {
+
+  // this method first invokes encoding routine to encode the data chunk,
+  // followed by invoking compression routine for preparing the data chunk for writing.
+  public static byte[][] encodeMeasureDataArray(
+      WriterCompressModel compressionModel,
+      CarbonWriteDataHolder[] dataHolder) {
+    char[] type = compressionModel.getType();
+    ValueCompressionHolder[] values =
+        new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
+    byte[][] returnValue = new byte[values.length][];
+    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
+      values[i] = compressionModel.getValueCompressionHolder()[i];
+      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
+          && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        // first perform encoding of the data chunk
+        values[i].setValue(
+            ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
+                .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
+                    compressionModel.getMaxValue()[i],
+                    compressionModel.getMantissa()[i]));
+      } else {
+        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
+      }
+      values[i].compress();
+      returnValue[i] = values[i].getCompressedData();
+    }
+
+    return returnValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index f134f0c..e60d675 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -824,7 +824,7 @@ public class CarbonMetadataUtil {
 
   public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
     ByteBuffer buffer = null;
-    if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+    if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) {
       buffer = ByteBuffer.allocate(
           (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
               + 3);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bf8c03b..a442087 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1427,7 +1427,7 @@ public final class CarbonUtil {
     ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
     valueEncoderMeta.setType(measureType);
     switch (measureType) {
-      case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+      case CarbonCommonConstants.DOUBLE_MEASURE:
         valueEncoderMeta.setMaxValue(buffer.getDouble());
         valueEncoderMeta.setMinValue(buffer.getDouble());
         valueEncoderMeta.setUniqueValue(buffer.getDouble());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 10411b0..e437405 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -152,10 +152,13 @@ public final class DataTypeUtil {
       case LONG:
         return CarbonCommonConstants.BIG_INT_MEASURE;
       default:
-        return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
+        return CarbonCommonConstants.DOUBLE_MEASURE;
     }
   }
 
+  // bytes of 0 in BigDecimal
+  public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0));
+
   /**
    * This method will convert a big decimal value to bytes
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index e468028..e682263 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -112,7 +112,7 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
           Double val = (Double) value;
           CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
           size += 8;
@@ -183,7 +183,7 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
           Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
           size += 8;
           rowToFill[dimensionSize + mesCount] = val;
@@ -254,7 +254,7 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
           double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
           size += 8;
           stream.writeDouble(val);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 60f259e..de2b874 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -324,7 +324,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
       for (int mesCount = 0; mesCount < measureCount; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+          if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
             row[dimensionCount + mesCount] = stream.readDouble();
           } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
             row[dimensionCount + mesCount] = stream.readLong();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7862a95..e52dc8a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -310,7 +310,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
           Double val = (Double) value;
           rowData.putDouble(size, val);
           size += 8;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 0ac2d5c..5487593 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -341,7 +341,7 @@ public class IntermediateFileMerger implements Callable<Void> {
           if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
             Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
             stream.writeDouble(val);
-          } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+          } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
             Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
             stream.writeDouble(val);
           } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 9b5a850..3a7a579 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -285,7 +285,7 @@ public class SortDataRows {
           Object value = row[mesCount + dimColCount];
           if (null != value) {
             stream.write((byte) 1);
-            if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+            if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
               Double val = (Double) value;
               stream.writeDouble(val);
             } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index ae01404..b4ccc6f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -332,7 +332,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       // read measure values
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
-          if (aggType[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+          if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
             measures[index++] = stream.readDouble();
           } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
             measures[index++] = stream.readLong();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2affa03..da75428 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataStore;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -478,155 +479,224 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  private NodeHolder processDataRows(List<Object[]> dataRows)
-      throws CarbonDataWriterException {
-    Object[] max = new Object[measureCount];
-    Object[] min = new Object[measureCount];
-    int[] decimal = new int[measureCount];
-    Object[] uniqueValue = new Object[measureCount];
-    // to store index of the measure columns which are null
-    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
-    for (int i = 0; i < max.length; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        max[i] = Long.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
-        max[i] = -Double.MAX_VALUE;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(0.0);
-      } else {
-        max[i] = 0.0;
+  /** statics for one blocklet/page */
+  class Statistics {
+    /** min and max value of the measures */
+    Object[] min, max;
+
+    /**
+     * the unique value is the non-exist value in the row,
+     * and will be used as storage key for null values of measures
+     */
+    Object[] uniqueValue;
+
+    /** decimal count of the measures */
+    int[] decimal;
+
+    Statistics(int measureCount) {
+      max = new Object[measureCount];
+      min = new Object[measureCount];
+      uniqueValue = new Object[measureCount];
+      decimal = new int[measureCount];
+      for (int i = 0; i < measureCount; i++) {
+        if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+          max[i] = Long.MIN_VALUE;
+          min[i] = Long.MAX_VALUE;
+          uniqueValue[i] = Long.MIN_VALUE;
+        } else if (type[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
+          max[i] = Double.MIN_VALUE;
+          min[i] = Double.MAX_VALUE;
+          uniqueValue[i] = Double.MIN_VALUE;
+        } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+          max[i] = new BigDecimal(Double.MIN_VALUE);
+          min[i] = new BigDecimal(Double.MAX_VALUE);
+          uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+        } else {
+          max[i] = 0.0;
+          min[i] = 0.0;
+          uniqueValue[i] = 0.0;
+        }
+        decimal[i] = 0;
       }
     }
-    for (int i = 0; i < min.length; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        min[i] = Long.MAX_VALUE;
-        uniqueValue[i] = Long.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
-        min[i] = Double.MAX_VALUE;
-        uniqueValue[i] = Double.MIN_VALUE;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        min[i] = new BigDecimal(Double.MAX_VALUE);
-        uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
-      } else {
-        min[i] = 0.0;
-        uniqueValue[i] = 0.0;
+
+    /**
+     * update the statistics for the input row
+     */
+    void update(int[] msrIndex, Object[] row, boolean compactionFlow) {
+      // Update row level min max
+      for (int i = 0; i < msrIndex.length; i++) {
+        int count = msrIndex[i];
+        if (row[count] != null) {
+          if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
+            double value = (double) row[count];
+            double maxVal = (double) max[count];
+            double minVal = (double) min[count];
+            max[count] = (maxVal > value ? max[count] : value);
+            min[count] = (minVal < value ? min[count] : value);
+            int num = getDecimalCount(value);
+            decimal[count] = (decimal[count] > num ? decimal[count] : num);
+            uniqueValue[count] = (double) min[count] - 1;
+          } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
+            long value = (long) row[count];
+            long maxVal = (long) max[count];
+            long minVal = (long) min[count];
+            max[count] = (maxVal > value ? max[count] : value);
+            min[count] = (minVal < value ? min[count] : value);
+            uniqueValue[count] = (long) min[count] - 1;
+          } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+            byte[] buff = null;
+            // in compaction flow the measure with decimal type will come as spark decimal.
+            // need to convert it to byte array.
+            if (compactionFlow) {
+              BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
+              buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
+            } else {
+              buff = (byte[]) row[count];
+            }
+            BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
+            decimal[count] = value.scale();
+            BigDecimal val = (BigDecimal) min[count];
+            uniqueValue[count] = (val.subtract(new BigDecimal(1.0)));
+          }
+        }
       }
     }
-    for (int i = 0; i < decimal.length; i++) {
-      decimal[i] = 0;
-    }
+  }
 
+  class IndexKey {
+    byte[] currentMDKey = null;
+    byte[][] currentNoDictionaryKey = null;
     byte[] startKey = null;
     byte[] endKey = null;
     byte[][] noDictStartKey = null;
     byte[][] noDictEndKey = null;
-    CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
-    CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
-    CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
+
+    /** update all keys based on the input row */
+    void update(Object[] row, boolean firstRow) {
+      currentMDKey = (byte[]) row[mdKeyIndex];
+      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
+        currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
+      }
+      if (firstRow) {
+        startKey = currentMDKey;
+        noDictStartKey = currentNoDictionaryKey;
+      }
+      endKey = currentMDKey;
+      noDictEndKey = currentNoDictionaryKey;
+    }
+  }
+
+  /** generate the NodeHolder from the input rows */
+  private NodeHolder processDataRows(List<Object[]> dataRows)
+      throws CarbonDataWriterException {
+    // to store index of the measure columns which are null
+    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
+    // statistics for one blocklet/page
+    Statistics stats = new Statistics(measureCount);
+    IndexKey keys = new IndexKey();
+
+    // initialize measureHolder, mdKeyHolder and noDictionaryHolder, these three Holders
+    // are the input for final encoding
+    CarbonWriteDataHolder[] measureHolder = initialiseDataHolder(dataRows.size());
+    CarbonWriteDataHolder mdKeyHolder = initialiseKeyBlockHolder(dataRows.size());
+    CarbonWriteDataHolder noDictionaryHolder = null;
     if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
+      noDictionaryHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
     }
 
+    // loop on the input rows, fill measureHolder, mdKeyHolder and noDictionaryHolder
     for (int count = 0; count < dataRows.size(); count++) {
       Object[] row = dataRows.get(count);
-      byte[] mdKey = (byte[]) row[this.mdKeyIndex];
-      byte[][] noDictionaryKey = null;
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryKey = (byte[][]) row[this.mdKeyIndex - 1];
-      }
-      ByteBuffer byteBuffer = null;
-      byte[] b = null;
-      if (count == 0) {
-        startKey = mdKey;
-        noDictStartKey = noDictionaryKey;
-      }
-      endKey = mdKey;
-      noDictEndKey = noDictionaryKey;
-      // add to key store
-      if (mdKey.length > 0) {
-        keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey);
+      keys.update(row, (count == 0));
+      if (keys.currentMDKey.length > 0) {
+        mdKeyHolder.setWritableByteArrayValueByIndex(count, keys.currentMDKey);
       }
-      // for storing the byte [] for high card.
       if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryKeyDataHolder.setWritableNonDictByteArrayValueByIndex(count, noDictionaryKey);
+        noDictionaryHolder.setWritableNonDictByteArrayValueByIndex(count,
+            keys.currentNoDictionaryKey);
       }
+      fillMeasureHolder(row, count, measureHolder, nullValueIndexBitSet);
+      stats.update(otherMeasureIndex, row, compactionFlow);
+      stats.update(customMeasureIndex, row, compactionFlow);
+    }
 
-      for (int k = 0; k < otherMeasureIndex.length; k++) {
-        if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          if (null == row[otherMeasureIndex[k]]) {
-            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-            dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
-          } else {
-            dataHolder[otherMeasureIndex[k]]
-                .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
-          }
+    // generate encoded byte array for 3 holders
+    // for measure columns: encode and compress the measureHolder
+    WriterCompressModel compressionModel =
+        ValueCompressionUtil.getWriterCompressModel(
+            stats.max, stats.min, stats.decimal, stats.uniqueValue, type, new byte[measureCount]);
+    byte[][] encodedMeasureArray =
+        HeavyCompressedDoubleArrayDataStore.encodeMeasureDataArray(
+            compressionModel, measureHolder);
+
+    // for mdkey and noDictionary, it is already in bytes, just get the array from holder
+    byte[][] mdKeyArray = mdKeyHolder.getByteArrayValues();
+    byte[][][] noDictionaryArray = null;
+    if ((noDictionaryCount + complexColCount) > 0) {
+      noDictionaryArray = noDictionaryHolder.getNonDictByteArrayValues();
+    }
+
+    // create NodeHolder using these encoded byte arrays
+    NodeHolder nodeHolder =
+        createNodeHolderObjectWithOutKettle(
+            encodedMeasureArray, mdKeyArray, noDictionaryArray, dataRows.size(),
+            keys.startKey, keys.endKey, compressionModel, keys.noDictStartKey, keys.noDictEndKey,
+            nullValueIndexBitSet);
+    LOGGER.info("Number Of records processed: " + dataRows.size());
+    return nodeHolder;
+  }
+
+  private void fillMeasureHolder(Object[] row, int count, CarbonWriteDataHolder[] measureHolder,
+      BitSet[] nullValueIndexBitSet) {
+    for (int k = 0; k < otherMeasureIndex.length; k++) {
+      if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
+        if (null == row[otherMeasureIndex[k]]) {
+          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+          measureHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
         } else {
-          if (null == row[otherMeasureIndex[k]]) {
-            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-            dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
-          } else {
-            dataHolder[otherMeasureIndex[k]]
-                .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
-          }
+          measureHolder[otherMeasureIndex[k]]
+              .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
         }
-      }
-      calculateMaxMin(max, min, decimal, otherMeasureIndex, row);
-      for (int i = 0; i < customMeasureIndex.length; i++) {
-        if (null == row[customMeasureIndex[i]]
-            && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          BigDecimal val = BigDecimal.valueOf(0);
-          b = DataTypeUtil.bigDecimalToByte(val);
-          nullValueIndexBitSet[customMeasureIndex[i]].set(count);
+      } else {
+        if (null == row[otherMeasureIndex[k]]) {
+          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+          measureHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
         } else {
-          if (this.compactionFlow) {
-            BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
-            b = DataTypeUtil.bigDecimalToByte(bigDecimal);
-          } else {
-            b = (byte[]) row[customMeasureIndex[i]];
-          }
+          measureHolder[otherMeasureIndex[k]]
+              .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
         }
-        BigDecimal value = DataTypeUtil.byteToBigDecimal(b);
-        String[] bigdVals = value.toPlainString().split("\\.");
-        long[] bigDvalue = new long[2];
-        if (bigdVals.length == 2) {
-          bigDvalue[0] = Long.parseLong(bigdVals[0]);
-          BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT + bigdVals[1]);
-          bigDvalue[1] = (long) (bd.doubleValue() * Math.pow(10, value.scale()));
+      }
+    }
+    ByteBuffer byteBuffer = null;
+    byte[] measureBytes = null;
+    for (int i = 0; i < customMeasureIndex.length; i++) {
+      if (null == row[customMeasureIndex[i]]
+          && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        measureBytes = DataTypeUtil.zeroBigDecimalBytes;
+        nullValueIndexBitSet[customMeasureIndex[i]].set(count);
+      } else {
+        if (this.compactionFlow) {
+          BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+          measureBytes = DataTypeUtil.bigDecimalToByte(bigDecimal);
         } else {
-          bigDvalue[0] = Long.parseLong(bigdVals[0]);
+          measureBytes = (byte[]) row[customMeasureIndex[i]];
         }
-        byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
-        byteBuffer.putInt(b.length);
-        byteBuffer.put(b);
-        byteBuffer.flip();
-        b = byteBuffer.array();
-        dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
       }
-      calculateMaxMin(max, min, decimal, customMeasureIndex, row);
-    }
-    calculateUniqueValue(min, uniqueValue);
-    byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone();
-    byte[][][] noDictionaryValueHolder = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryValueHolder = noDictionaryKeyDataHolder.getNonDictByteArrayValues();
+      byteBuffer = ByteBuffer.allocate(measureBytes.length +
+          CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      byteBuffer.putInt(measureBytes.length);
+      byteBuffer.put(measureBytes);
+      byteBuffer.flip();
+      measureBytes = byteBuffer.array();
+      measureHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, measureBytes);
     }
-    WriterCompressModel compressionModel = ValueCompressionUtil
-        .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[max.length]);
-    byte[][] writableMeasureDataArray =
-        StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
-            .clone();
-    NodeHolder nodeHolder =
-        getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(),
-            startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey,
-            noDictEndKey, nullValueIndexBitSet);
-    LOGGER.info("Number Of records processed: " + dataRows.size());
-    return nodeHolder;
   }
 
-  private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal,
-      byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][][] noDictionaryData,
-      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
+  private NodeHolder createNodeHolderObjectWithOutKettle(byte[][] measureArray, byte[][] mdKeyArray,
+      byte[][][] noDictionaryArray, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
+      WriterCompressModel compressionModel, byte[][] noDictionaryStartKey,
+      byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
       throws CarbonDataWriterException {
     byte[][][] noDictionaryColumnsData = null;
     List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
@@ -636,19 +706,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       colsAndValues.add(new ArrayList<byte[]>());
     }
     int noOfColumn = colGrpModel.getNoOfColumnStore();
-    DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length);
-    for (int i = 0; i < byteArrayValues.length; i++) {
-      byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]);
+    DataHolder[] dataHolders = getDataHolders(noOfColumn, mdKeyArray.length);
+    for (int i = 0; i < mdKeyArray.length; i++) {
+      byte[][] splitKey = columnarSplitter.splitKey(mdKeyArray[i]);
 
       for (int j = 0; j < splitKey.length; j++) {
         dataHolders[j].addData(splitKey[j], i);
       }
     }
     if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][];
-      for (int i = 0; i < noDictionaryData.length; i++) {
+      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryArray.length][];
+      for (int i = 0; i < noDictionaryArray.length; i++) {
         int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
-        byte[][] splitKey = noDictionaryData[i];
+        byte[][] splitKey = noDictionaryArray[i];
 
         int complexTypeIndex = 0;
         for (int j = 0; j < splitKey.length; j++) {
@@ -754,7 +824,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey);
     }
     return this.dataWriter
-        .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal,
+        .buildDataNodeHolder(blockStorage, measureArray, entryCountLocal, startkeyLocal,
             endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey,
             nullValueIndexBitSet);
   }
@@ -914,7 +984,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     for (int i = 0; i < msrIndex.length; i++) {
       int count = msrIndex[i];
       if (row[count] != null) {
-        if (type[count] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
           double value = (double) row[count];
           double maxVal = (double) max[count];
           double minVal = (double) min[count];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
deleted file mode 100644
index 0097483..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import org.apache.carbondata.core.datastore.NodeMeasureDataStore;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataInMemoryStore;
-
-public final class StoreFactory {
-
-  private StoreFactory() {
-  }
-
-  public static NodeMeasureDataStore createDataStore(WriterCompressModel compressionModel) {
-    return new HeavyCompressedDoubleArrayDataInMemoryStore(compressionModel);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index 227f92b..c8f740b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -31,7 +31,7 @@ public interface CarbonFactDataWriter<T> {
    * file format
    * <key><measure1><measure2>....
    *
-   * @param dataArray            measure array
+   * @param measureArray            measure array
    * @param entryCount           number of entries
    * @param startKey             start key of leaf
    * @param endKey               end key of leaf
@@ -40,7 +40,7 @@ public interface CarbonFactDataWriter<T> {
    * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
    */
 
-  NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] dataArray,
+  NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] measureArray,
       int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
       byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
       throws CarbonDataWriterException;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index cf5311c..64077e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -47,9 +47,10 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
   }
 
   @Override
-  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
-      int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
+  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray,
+      byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey,
+      WriterCompressModel compressionModel, byte[] noDictionaryStartKey,
+      byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
       throws CarbonDataWriterException {
     // if there are no NO-Dictionary column present in the table then
     // set the empty byte array
@@ -143,13 +144,13 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     int[] msrLength = new int[dataWriterVo.getMeasureCount()];
     // calculate the total size required for all the measure and get the
     // each measure size
-    for (int i = 0; i < dataArray.length; i++) {
-      currentMsrLenght = dataArray[i].length;
+    for (int i = 0; i < measureArray.length; i++) {
+      currentMsrLenght = measureArray[i].length;
       totalMsrArrySize += currentMsrLenght;
       msrLength[i] = currentMsrLenght;
     }
     NodeHolder holder = new NodeHolder();
-    holder.setDataArray(dataArray);
+    holder.setDataArray(measureArray);
     holder.setKeyArray(keyBlockData);
     holder.setMeasureNullValueIndex(nullValueIndexBitSet);
     // end key format will be <length of dictionary key><length of no

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 3b7c763..aa54a4b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -83,7 +83,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
    * be written in carbon data file
    */
   @Override public NodeHolder buildDataNodeHolder(IndexStorage<short[]>[] keyStorageArray,
-      byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey,
+      byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey,
       WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey,
       BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException {
     // if there are no NO-Dictionary column present in the table then
@@ -113,8 +113,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
     byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];
 
-    byte[][] measureMinValue = new byte[dataArray.length][];
-    byte[][] measureMaxValue = new byte[dataArray.length][];
+    byte[][] measureMinValue = new byte[measureArray.length][];
+    byte[][] measureMaxValue = new byte[measureArray.length][];
 
     byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
     boolean[] colGrpBlock = new boolean[keyStorageArray.length];
@@ -137,7 +137,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
         colGrpBlock[i] = true;
       }
     }
-    for (int i = 0; i < dataArray.length; i++) {
+    for (int i = 0; i < measureArray.length; i++) {
       measureMaxValue[i] = CarbonMetadataUtil
           .getByteValueForMeasure(compressionModel.getMaxValue()[i],
               dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
@@ -176,13 +176,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     int[] msrLength = new int[dataWriterVo.getMeasureCount()];
     // calculate the total size required for all the measure and get the
     // each measure size
-    for (int i = 0; i < dataArray.length; i++) {
-      currentMsrLenght = dataArray[i].length;
+    for (int i = 0; i < measureArray.length; i++) {
+      currentMsrLenght = measureArray[i].length;
       totalMsrArrySize += currentMsrLenght;
       msrLength[i] = currentMsrLenght;
     }
     NodeHolder holder = new NodeHolder();
-    holder.setDataArray(dataArray);
+    holder.setDataArray(measureArray);
     holder.setKeyArray(keyBlockData);
     holder.setMeasureNullValueIndex(nullValueIndexBitSet);
     // end key format will be <length of dictionary key><length of no