You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/12 13:57:12 UTC

[3/7] carbondata git commit: [CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 059c734..a515f0b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -141,7 +141,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         match {
           case parser.Success(field, _) => field.asInstanceOf[Field]
           case failureOrError => throw new MalformedCarbonCommandException(
-            s"Unsupported data type: $col.getType")
+            s"Unsupported data type: $col.getDataType")
         }
         // the data type of the decimal type will be like decimal(10,0)
         // so checking the start of the string and taking the precision and scale.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 81ee408..690f6ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -43,6 +44,8 @@ import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
+import org.apache.spark.sql.types.Decimal;
+
 /**
  * This class will process the query result and convert the data
  * into a format compatible for data load
@@ -89,7 +92,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   /**
    * agg type defined for measures
    */
-  private char[] aggType;
+  private DataType[] aggType;
   /**
    * segment id
    */
@@ -243,14 +246,14 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * This method will convert the spark decimal to java big decimal type
    *
    * @param value
-   * @param aggType
+   * @param type
    * @return
    */
-  private Object getConvertedMeasureValue(Object value, char aggType) {
-    switch (aggType) {
-      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+  private Object getConvertedMeasureValue(Object value, DataType type) {
+    switch (type) {
+      case DECIMAL:
         if (value != null) {
-          value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal();
+          value = ((Decimal) value).toJavaBigDecimal();
         }
         return value;
       default:
@@ -404,6 +407,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * initialise aggregation type for measures for their storage format
    */
   private void initAggType() {
-    aggType = CarbonDataProcessorUtil.initAggType(carbonTable, tableName, measureCount);
+    aggType = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 0e14660..c1aafcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -82,7 +82,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
-            sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
+            sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
             sortParameters.getNoDictionaryDimnesionColumn(),
             sortParameters.getNoDictionarySortColumn());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 60231c5..c8977ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -142,7 +142,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
             sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
             this.sortParameters.getNoDictionarySortColumn());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 44f11f7..24109e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -40,7 +40,7 @@ public class UnsafeCarbonRowPage {
 
   private int measureSize;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private long[] nullSetWords;
 
@@ -55,13 +55,13 @@ public class UnsafeCarbonRowPage {
   private boolean saveToDisk;
 
   public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
-      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
       MemoryBlock memoryBlock, boolean saveToDisk) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
     this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
     this.measureSize = measureSize;
-    this.aggType = aggType;
+    this.measureDataType = type;
     this.saveToDisk = saveToDisk;
     this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
     buffer = new IntPointerBuffer(memoryBlock);
@@ -116,24 +116,30 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = (Double) value;
-          CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = (Long) value;
-          CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          BigDecimal val = (BigDecimal) value;
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-          CarbonUnsafe.unsafe.putShort(baseObject, address + size,
-              (short) bigDecimalInBytes.length);
-          size += 2;
-          CarbonUnsafe.unsafe
-              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-                  address + size, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            Long val = (Long) value;
+            CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
+            size += 8;
+            break;
+          case DOUBLE:
+            Double doubleVal = (Double) value;
+            CarbonUnsafe.unsafe.putDouble(baseObject, address + size, doubleVal);
+            size += 8;
+            break;
+          case DECIMAL:
+            BigDecimal decimalVal = (BigDecimal) value;
+            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+            CarbonUnsafe.unsafe.putShort(baseObject, address + size,
+                (short) bigDecimalInBytes.length);
+            size += 2;
+            CarbonUnsafe.unsafe
+                .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+                    address + size, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            break;
         }
         set(nullSetWords, mesCount);
       } else {
@@ -187,22 +193,28 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = val;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = val;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = val;
+            break;
+          case DOUBLE:
+            Double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = doubleVal;
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+            break;
         }
       } else {
         rowToFill[dimensionSize + mesCount] = null;
@@ -258,33 +270,34 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
-          size += 8;
-          stream.writeDouble(val);
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
-          size += 8;
-          stream.writeLong(val);
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          stream.writeShort(aShort);
-          stream.write(bigDecimalInBytes);
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+            size += 8;
+            stream.writeLong(val);
+            break;
+          case DOUBLE:
+            double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+            size += 8;
+            stream.writeDouble(doubleVal);
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            stream.writeShort(aShort);
+            stream.write(bigDecimalInBytes);
+            break;
         }
       }
     }
   }
 
-  private Object[] getRow(long address) {
-    Object[] row = new Object[dimensionSize + measureSize];
-    return getRow(address, row);
-  }
-
   public void freeMemory() {
     buffer.freeMemory();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 40608fa..a9c0cb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -115,7 +115,7 @@ public class UnsafeSortDataRows {
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
         parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
-        parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
+        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
         !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
@@ -178,10 +178,14 @@ public class UnsafeSortDataRows {
             dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
             boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
-            rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+            rowPage = new UnsafeCarbonRowPage(
+                parameters.getNoDictionaryDimnesionColumn(),
                 parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount(),
-                parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
+                parameters.getMeasureColCount(),
+                parameters.getMeasureDataType(),
+                memoryBlock,
+                saveToDisk);
             bytesAdded += rowPage.addRow(rowBatch[i]);
           } catch (Exception e) {
             LOGGER.error(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index cfdb69a..aee4e51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
@@ -122,7 +123,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   private int noDictionaryCount;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private int numberOfObjectRead;
   /**
@@ -150,7 +151,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     // set mdkey length
     this.fileBufferSize = parameters.getFileBufferSize();
     this.executorService = Executors.newFixedThreadPool(1);
-    this.aggType = parameters.getAggType();
+    this.measureDataType = parameters.getMeasureDataType();
     this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
     this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
     comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
@@ -323,15 +324,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
       for (int mesCount = 0; mesCount < measureCount; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            row[dimensionCount + mesCount] = stream.readDouble();
-          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            row[dimensionCount + mesCount] = stream.readLong();
-          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            short aShort = stream.readShort();
-            byte[] bigDecimalInBytes = new byte[aShort];
-            stream.readFully(bigDecimalInBytes);
-            row[dimensionCount + mesCount] = bigDecimalInBytes;
+          switch (measureDataType[mesCount]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              row[dimensionCount + mesCount] = stream.readLong();
+              break;
+            case DOUBLE:
+              row[dimensionCount + mesCount] = stream.readDouble();
+              break;
+            case DECIMAL:
+              short aShort = stream.readShort();
+              byte[] bigDecimalInBytes = new byte[aShort];
+              stream.readFully(bigDecimalInBytes);
+              row[dimensionCount + mesCount] = bigDecimalInBytes;
+              break;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index e52dc8a..90c3b69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
@@ -278,7 +278,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
     int dimCount = 0;
     int size = 0;
-    char[] aggType = mergerParameters.getAggType();
+    DataType[] type = mergerParameters.getMeasureDataType();
     for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
       if (noDictionarycolumnMapping[dimCount]) {
         byte[] col = (byte[]) row[dimCount];
@@ -310,21 +310,25 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = (Double) value;
-          rowData.putDouble(size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = (Long) value;
-          rowData.putLong(size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] bigDecimalInBytes = (byte[]) value;
-          rowData.putShort(size, (short)bigDecimalInBytes.length);
-          size += 2;
-          for (int i = 0; i < bigDecimalInBytes.length; i++) {
-            rowData.put(size++, bigDecimalInBytes[i]);
-          }
+        switch (type[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            rowData.putLong(size, (Long) value);
+            size += 8;
+            break;
+          case DOUBLE:
+            rowData.putDouble(size, (Double) value);
+            size += 8;
+            break;
+          case DECIMAL:
+            byte[] bigDecimalInBytes = (byte[]) value;
+            rowData.putShort(size, (short)bigDecimalInBytes.length);
+            size += 2;
+            for (int i = 0; i < bigDecimalInBytes.length; i++) {
+              rowData.put(size++, bigDecimalInBytes[i]);
+            }
+            break;
         }
         UnsafeCarbonRowPage.set(nullSetWords, mesCount);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index c50f335..0f0a5b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -26,11 +26,11 @@ import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -64,7 +64,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private boolean[] isNoDictionaryDimensionColumn;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private int dimensionCount;
 
@@ -115,8 +115,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
       isNoDictionaryDimensionColumn =
           CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-      aggType = CarbonDataProcessorUtil
-          .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
+      measureDataType = CarbonDataProcessorUtil
+          .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
 
       CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
           .createCarbonFactDataHandlerModel(configuration,
@@ -266,7 +266,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     for (; l < this.measureCount; l++) {
       Object value = row.getObject(l + this.dimensionWithComplexCount);
       if (null != value) {
-        if (aggType[l] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        if (measureDataType[l] == DataType.DECIMAL) {
           BigDecimal val = (BigDecimal) value;
           outputRow[l] = DataTypeUtil.bigDecimalToByte(val);
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index a9e762d..d20292c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
@@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> {
           new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
               mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.getMeasureDataType(),
+              mergerParameters.getNoDictionaryDimnesionColumn(),
               mergerParameters.getNoDictionarySortColumn());
 
       // initialize
@@ -319,7 +320,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       return;
     }
     try {
-      char[] aggType = mergerParameters.getAggType();
+      DataType[] aggType = mergerParameters.getMeasureDataType();
       int[] mdkArray = (int[]) row[0];
       byte[][] nonDictArray = (byte[][]) row[1];
       int mdkIndex = 0;
@@ -339,27 +340,27 @@ public class IntermediateFileMerger implements Callable<Void> {
       for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
-          if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
-            Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeDouble(val);
-          } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeDouble(val);
-          } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeLong(val);
-          } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeInt(bigDecimalInBytes.length);
-            stream.write(bigDecimalInBytes);
+          switch (aggType[counter]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
+              stream.writeLong(val);
+              break;
+            case DOUBLE:
+              stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case DECIMAL:
+              byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+              break;
           }
         } else {
           stream.write((byte) 0);
         }
-
         fieldIndex++;
       }
-
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index eba5433..af654e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -256,7 +257,7 @@ public class SortDataRows {
       stream.writeInt(entryCountLocal);
       int complexDimColCount = parameters.getComplexDimColCount();
       int dimColCount = parameters.getDimColCount() + complexDimColCount;
-      char[] aggType = parameters.getAggType();
+      DataType[] type = parameters.getMeasureDataType();
       boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
       Object[] row = null;
       for (int i = 0; i < entryCountLocal; i++) {
@@ -285,17 +286,21 @@ public class SortDataRows {
           Object value = row[mesCount + dimColCount];
           if (null != value) {
             stream.write((byte) 1);
-            if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-              Double val = (Double) value;
-              stream.writeDouble(val);
-            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-              Long val = (Long) value;
-              stream.writeLong(val);
-            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-              BigDecimal val = (BigDecimal) value;
-              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-              stream.writeInt(bigDecimalInBytes.length);
-              stream.write(bigDecimalInBytes);
+            switch (type[mesCount]) {
+              case SHORT:
+              case INT:
+              case LONG:
+                stream.writeLong((Long) value);
+                break;
+              case DOUBLE:
+                stream.writeDouble((Double) value);
+                break;
+              case DECIMAL:
+                BigDecimal val = (BigDecimal) value;
+                byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+                stream.writeInt(bigDecimalInBytes.length);
+                stream.write(bigDecimalInBytes);
+                break;
             }
           } else {
             stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 7ef8f8e..8ac1491 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
@@ -88,7 +89,7 @@ public class SortParameters {
 
   private String tableName;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   /**
    * To know how many columns are of high cardinality.
@@ -137,7 +138,7 @@ public class SortParameters {
     parameters.bufferSize = bufferSize;
     parameters.databaseName = databaseName;
     parameters.tableName = tableName;
-    parameters.aggType = aggType;
+    parameters.measureDataType = measureDataType;
     parameters.noDictionaryCount = noDictionaryCount;
     parameters.partitionID = partitionID;
     parameters.segmentId = segmentId;
@@ -270,12 +271,12 @@ public class SortParameters {
     this.tableName = tableName;
   }
 
-  public char[] getAggType() {
-    return aggType;
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
   }
 
-  public void setAggType(char[] aggType) {
-    this.aggType = aggType;
+  public void setMeasureDataType(DataType[] measureDataType) {
+    this.measureDataType = measureDataType;
   }
 
   public int getNoDictionaryCount() {
@@ -458,9 +459,9 @@ public class SortParameters {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    char[] aggType = CarbonDataProcessorUtil
-        .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
-    parameters.setAggType(aggType);
+    DataType[] measureDataType = CarbonDataProcessorUtil
+        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
+    parameters.setMeasureDataType(measureDataType);
     return parameters;
   }
 
@@ -560,10 +561,10 @@ public class SortParameters {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    char[] aggType = CarbonDataProcessorUtil
-        .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+    DataType[] type = CarbonDataProcessorUtil
+        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
             parameters.getTableName());
-    parameters.setAggType(aggType);
+    parameters.setMeasureDataType(type);
     return parameters;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 6695a5b..a4fdec1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -125,7 +126,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   private int noDictionaryCount;
 
-  private char[] aggType;
+  private DataType[] aggType;
 
   /**
    * to store whether dimension is of dictionary type or not
@@ -150,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * @param isNoDictionaryDimensionColumn
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
-      int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
+      int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
       boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
     // set temp file
     this.tempFile = tempFile;
@@ -338,15 +339,21 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       // read measure values
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
-          if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            measures[index++] = stream.readDouble();
-          } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            measures[index++] = stream.readLong();
-          } else {
-            int len = stream.readInt();
-            byte[] buff = new byte[len];
-            stream.readFully(buff);
-            measures[index++] = buff;
+          switch (aggType[i]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              measures[index++] = stream.readLong();
+              break;
+            case DOUBLE:
+              measures[index++] = stream.readDouble();
+              break;
+            case DECIMAL:
+              int len = stream.readInt();
+              byte[] buff = new byte[len];
+              stream.readFully(buff);
+              measures[index++] = buff;
+              break;
           }
         } else {
           measures[index++] = null;