You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:02 UTC

[12/35] carbondata git commit: [CARBONDATA-1539] Change data type from enum to class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index ce16ad5..d78d144 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.Direc
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 
 /**
@@ -111,21 +112,21 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
     SimpleStatsResult stats = columnPage.getStatistics();
-    switch (stats.getDataType()) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
-      case DECIMAL:
-        return createEncoderForDecimalDataTypeMeasure(columnPage);
-      case FLOAT:
-      case DOUBLE:
-        return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
-      case BYTE_ARRAY:
-        return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
-      default:
-        throw new RuntimeException("unsupported data type: " + stats.getDataType());
+    DataType dataType = stats.getDataType();
+    if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.SHORT ||
+        dataType == DataTypes.INT ||
+        dataType == DataTypes.LONG) {
+      return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+    } else if (dataType == DataTypes.DECIMAL) {
+      return createEncoderForDecimalDataTypeMeasure(columnPage);
+    } else if (dataType == DataTypes.FLOAT ||
+        dataType == DataTypes.DOUBLE) {
+      return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
+    } else {
+      throw new RuntimeException("unsupported data type: " + stats.getDataType());
     }
   }
 
@@ -144,32 +145,31 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   private static DataType fitLongMinMax(long max, long min) {
     if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
-      return DataType.BYTE;
+      return DataTypes.BYTE;
     } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
-      return DataType.SHORT;
+      return DataTypes.SHORT;
     } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
-      return DataType.SHORT_INT;
+      return DataTypes.SHORT_INT;
     } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
-      return DataType.INT;
+      return DataTypes.INT;
     } else {
-      return DataType.LONG;
+      return DataTypes.LONG;
     }
   }
 
   private static DataType fitMinMax(DataType dataType, Object max, Object min) {
-    switch (dataType) {
-      case BYTE:
-        return fitLongMinMax((byte) max, (byte) min);
-      case SHORT:
-        return fitLongMinMax((short) max, (short) min);
-      case INT:
-        return fitLongMinMax((int) max, (int) min);
-      case LONG:
-        return fitLongMinMax((long) max, (long) min);
-      case DOUBLE:
-        return fitLongMinMax((long) (double) max, (long) (double) min);
-      default:
-        throw new RuntimeException("internal error: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      return fitLongMinMax((byte) max, (byte) min);
+    } else if (dataType == DataTypes.SHORT) {
+      return fitLongMinMax((short) max, (short) min);
+    } else if (dataType == DataTypes.INT) {
+      return fitLongMinMax((int) max, (int) min);
+    } else if (dataType == DataTypes.LONG) {
+      return fitLongMinMax((long) max, (long) min);
+    } else if (dataType == DataTypes.DOUBLE) {
+      return fitLongMinMax((long) (double) max, (long) (double) min);
+    } else {
+      throw new RuntimeException("internal error: " + dataType);
     }
   }
 
@@ -196,7 +196,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
         long value = maxValue - minValue;
         return compareMinMaxAndSelectDataType(value);
       case DECIMAL_LONG:
-        return DataType.LONG;
+        return DataTypes.LONG;
       default:
         throw new RuntimeException("internal error: " + dataType);
     }
@@ -206,38 +206,34 @@ public class DefaultEncodingFactory extends EncodingFactory {
   private static DataType fitDelta(DataType dataType, Object max, Object min) {
     // use long data type to calculate delta to avoid overflow
     long value;
-    switch (dataType) {
-      case BYTE:
-        value = (long)(byte) max - (long)(byte) min;
-        break;
-      case SHORT:
-        value = (long)(short) max - (long)(short) min;
-        break;
-      case INT:
-        value = (long)(int) max - (long)(int) min;
-        break;
-      case LONG:
-        // TODO: add overflow detection and return delta type
-        return DataType.LONG;
-      case DOUBLE:
-        return DataType.LONG;
-      default:
-        throw new RuntimeException("internal error: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      value = (long) (byte) max - (long) (byte) min;
+    } else if (dataType == DataTypes.SHORT) {
+      value = (long) (short) max - (long) (short) min;
+    } else if (dataType == DataTypes.INT) {
+      value = (long) (int) max - (long) (int) min;
+    } else if (dataType == DataTypes.LONG) {
+      // TODO: add overflow detection and return delta type
+      return DataTypes.LONG;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return DataTypes.LONG;
+    } else {
+      throw new RuntimeException("internal error: " + dataType);
     }
     return compareMinMaxAndSelectDataType(value);
   }
 
   private static DataType compareMinMaxAndSelectDataType(long value) {
     if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-      return DataType.BYTE;
+      return DataTypes.BYTE;
     } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-      return DataType.SHORT;
+      return DataTypes.SHORT;
     } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
-      return DataType.SHORT_INT;
+      return DataTypes.SHORT_INT;
     } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-      return DataType.INT;
+      return DataTypes.INT;
     } else {
-      return DataType.LONG;
+      return DataTypes.LONG;
     }
   }
 
@@ -250,8 +246,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
     DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
     DataType deltaDataType;
 
-    if (adaptiveDataType == DataType.LONG) {
-      deltaDataType = DataType.LONG;
+    if (adaptiveDataType == DataTypes.LONG) {
+      deltaDataType = DataTypes.LONG;
     } else {
       deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
     }
@@ -287,15 +283,15 @@ public class DefaultEncodingFactory extends EncodingFactory {
       // short, int, long
       return selectCodecByAlgorithmForIntegral(stats);
     } else if (decimalCount < 0) {
-      return new DirectCompressCodec(DataType.DOUBLE);
+      return new DirectCompressCodec(DataTypes.DOUBLE);
     } else {
       // double
       long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
       DataType adaptiveDataType = fitLongMinMax(max, 0);
-      if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) {
+      if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes()) {
         return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
       } else {
-        return new DirectCompressCodec(DataType.DOUBLE);
+        return new DirectCompressCodec(DataTypes.DOUBLE);
       }
     }
   }
@@ -312,8 +308,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
             decimalConverterType);
     DataType deltaDataType;
 
-    if (adaptiveDataType == DataType.LONG) {
-      deltaDataType = DataType.LONG;
+    if (adaptiveDataType == DataTypes.LONG) {
+      deltaDataType = DataTypes.LONG;
     } else {
       deltaDataType = fitDeltaForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
           decimalConverterType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 1cb1613..180228a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCo
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.Encoding;
 
@@ -108,60 +109,60 @@ public abstract class EncodingFactory {
     TableSpec.ColumnSpec spec = new TableSpec.ColumnSpec("legacy", stats.getDataType(),
         ColumnType.MEASURE);
     String compressor = "snappy";
-    switch (DataType.getDataType(metadata.getType())) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        // create the codec based on algorithm and create decoder by recovering the metadata
-        ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats);
-        if (codec instanceof AdaptiveIntegralCodec) {
-          AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
-          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-              adaptiveCodec.getTargetDataType(), stats, compressor);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
-          AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
-          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-              adaptiveCodec.getTargetDataType(), stats, compressor);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof DirectCompressCodec) {
-          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-              DataType.getDataType(metadata.getType()), stats, compressor);
-          return codec.createDecoder(meta);
-        } else {
-          throw new RuntimeException("internal error");
-        }
-      case FLOAT:
-      case DOUBLE:
-        // create the codec based on algorithm and create decoder by recovering the metadata
-        codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats);
-        if (codec instanceof AdaptiveFloatingCodec) {
-          AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
-          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-              adaptiveCodec.getTargetDataType(), stats, compressor);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof DirectCompressCodec) {
-          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-              DataType.getDataType(metadata.getType()), stats, compressor);
-          return codec.createDecoder(meta);
-        } else {
-          throw new RuntimeException("internal error");
-        }
-      case DECIMAL:
-      case BYTE_ARRAY:
-        // no dictionary dimension
-        return new DirectCompressCodec(stats.getDataType()).createDecoder(
-            new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
-        // In case of older versions like in V1 format it has special datatype to handle
-      case LEGACY_LONG:
-        AdaptiveIntegralCodec adaptiveCodec =
-            new AdaptiveIntegralCodec(DataType.LONG, DataType.LONG, stats);
-        ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
-            adaptiveCodec.getTargetDataType(), stats, compressor);
-        return adaptiveCodec.createDecoder(meta);
-      default:
-        throw new RuntimeException("unsupported data type: " + stats.getDataType());
+    DataType dataType = DataType.getDataType(metadata.getType());
+    if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.SHORT ||
+        dataType == DataTypes.INT ||
+        dataType == DataTypes.LONG) {
+      // create the codec based on algorithm and create decoder by recovering the metadata
+      ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats);
+      if (codec instanceof AdaptiveIntegralCodec) {
+        AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        return codec.createDecoder(meta);
+      } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
+        AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        return codec.createDecoder(meta);
+      } else if (codec instanceof DirectCompressCodec) {
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
+                compressor);
+        return codec.createDecoder(meta);
+      } else {
+        throw new RuntimeException("internal error");
+      }
+    } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
+      // create the codec based on algorithm and create decoder by recovering the metadata
+      ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats);
+      if (codec instanceof AdaptiveFloatingCodec) {
+        AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        return codec.createDecoder(meta);
+      } else if (codec instanceof DirectCompressCodec) {
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
+                compressor);
+        return codec.createDecoder(meta);
+      } else {
+        throw new RuntimeException("internal error");
+      }
+    } else if (dataType == DataTypes.DECIMAL || dataType == DataTypes.BYTE_ARRAY) {
+      // no dictionary dimension
+      return new DirectCompressCodec(stats.getDataType())
+          .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+    } else if (dataType == DataTypes.LEGACY_LONG) {
+      // In case of older versions like in V1 format it has special datatype to handle
+      AdaptiveIntegralCodec adaptiveCodec =
+          new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats);
+      ColumnPageEncoderMeta meta =
+          new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+      return adaptiveCodec.createDecoder(meta);
+    } else {
+      throw new RuntimeException("unsupported data type: " + stats.getDataType());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 383670a..96f7b16 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -34,10 +34,9 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.format.Encoding;
 
-import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
-
 /**
  * Codec for integer (byte, short, int, long) data type and floating data type (in case of
  * scale is 0).
@@ -52,29 +51,22 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
       SimpleStatsResult stats) {
     super(srcDataType, targetDataType, stats);
-    switch (srcDataType) {
-      case BYTE:
-        this.max = (byte) stats.getMax();
-        break;
-      case SHORT:
-        this.max = (short) stats.getMax();
-        break;
-      case INT:
-        this.max = (int) stats.getMax();
-        break;
-      case LONG:
-        this.max = (long) stats.getMax();
-        break;
-      case DOUBLE:
-        this.max = (long) (double) stats.getMax();
-        break;
-      case DECIMAL:
-        this.max = ((BigDecimal) stats.getMax()).unscaledValue().longValue();
-        break;
-      default:
-        // this codec is for integer type only
-        throw new UnsupportedOperationException(
-            "unsupported data type for Delta compress: " + srcDataType);
+    if (srcDataType == DataTypes.BYTE) {
+      this.max = (byte) stats.getMax();
+    } else if (srcDataType == DataTypes.SHORT) {
+      this.max = (short) stats.getMax();
+    } else if (srcDataType == DataTypes.INT) {
+      this.max = (int) stats.getMax();
+    } else if (srcDataType == DataTypes.LONG) {
+      this.max = (long) stats.getMax();
+    } else if (srcDataType == DataTypes.DOUBLE) {
+      this.max = (long) (double) stats.getMax();
+    } else if (srcDataType == DataTypes.DECIMAL) {
+      this.max = ((BigDecimal) stats.getMax()).unscaledValue().longValue();
+    } else {
+      // this codec is for integer type only
+      throw new UnsupportedOperationException(
+          "unsupported data type for Delta compress: " + srcDataType);
     }
   }
 
@@ -122,12 +114,10 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       @Override public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
-        switch (meta.getSchemaDataType()) {
-          case DECIMAL:
-            page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-            break;
-          default:
-            page = ColumnPage.decompress(meta, input, offset, length);
+        if (meta.getSchemaDataType() == DataTypes.DECIMAL) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length);
         }
         return LazyColumnPage.newPage(page, converter);
       }
@@ -137,115 +127,87 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
     @Override
     public void encode(int rowId, byte value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
     @Override
     public void encode(int rowId, short value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (max - value));
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
     @Override
     public void encode(int rowId, int value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (max - value));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (max - value));
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
     @Override
     public void encode(int rowId, long value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, max - value);
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (max - value));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, max - value);
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
     @Override
     public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (max - value));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) (max - value));
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
     @Override
     public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (max - value));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (max - value));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (max - value));
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) (max - value));
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index c238245..fb3e248 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -135,50 +136,37 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
     @Override
     public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) (value * factor));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) (value * factor));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) (value * factor));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) (value * factor));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long) (value * factor));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (value * factor));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (value * factor));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (value * factor));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (value * factor));
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) (value * factor));
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) (value * factor));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) (value * factor));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) (value * factor));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) (value * factor));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long) (value * factor));
-          break;
-        case DOUBLE:
-          encodedPage.putDouble(rowId, value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) (value * factor));
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) (value * factor));
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) (value * factor));
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) (value * factor));
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) (value * factor));
+      } else if (targetDataType == DataTypes.DOUBLE) {
+        encodedPage.putDouble(rowId, value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index bbc28a6..907649d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -93,12 +94,10 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
-        switch (meta.getSchemaDataType()) {
-          case DECIMAL:
-            page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-            break;
-          default:
-            page = ColumnPage.decompress(meta, input, offset, length);
+        if (meta.getSchemaDataType() == DataTypes.DECIMAL) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length);
         }
         return LazyColumnPage.newPage(page, converter);
       }
@@ -109,106 +108,85 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
   private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
     @Override
     public void encode(int rowId, byte value) {
-      switch (targetDataType) {
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, short value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) value);
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, int value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) value);
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) value);
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, value);
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, long value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) value);
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) value);
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) value);
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) value);
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) value);
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) value);
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) value);
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 
     @Override
     public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
+      if (targetDataType == DataTypes.BYTE) {
+        encodedPage.putByte(rowId, (byte) value);
+      } else if (targetDataType == DataTypes.SHORT) {
+        encodedPage.putShort(rowId, (short) value);
+      } else if (targetDataType == DataTypes.SHORT_INT) {
+        encodedPage.putShortInt(rowId, (int) value);
+      } else if (targetDataType == DataTypes.INT) {
+        encodedPage.putInt(rowId, (int) value);
+      } else if (targetDataType == DataTypes.LONG) {
+        encodedPage.putLong(rowId, (long) value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 13879b9..b3d282e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -102,7 +103,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
     @Override
     public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
       ColumnPage decodedPage;
-      if (dataType == DataType.DECIMAL) {
+      if (dataType == DataTypes.DECIMAL) {
         decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
       } else {
         decodedPage = ColumnPage.decompress(meta, input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index 419b589..809bac0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -70,14 +71,9 @@ public class RLECodec implements ColumnPageCodec {
 
   // This codec supports integral type only
   private void validateDataType(DataType dataType) {
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        break;
-      default:
-        throw new UnsupportedOperationException(dataType + " is not supported for RLE");
+    if (! (dataType == DataTypes.BYTE || dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
+        dataType == DataTypes.LONG)) {
+      throw new UnsupportedOperationException(dataType + " is not supported for RLE");
     }
   }
 
@@ -117,34 +113,29 @@ public class RLECodec implements ColumnPageCodec {
     protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
       validateDataType(input.getDataType());
       this.dataType = input.getDataType();
-      switch (dataType) {
-        case BYTE:
-          byte[] bytePage = input.getBytePage();
-          for (int i = 0; i < bytePage.length; i++) {
-            putValue(bytePage[i]);
-          }
-          break;
-        case SHORT:
-          short[] shortPage = input.getShortPage();
-          for (int i = 0; i < shortPage.length; i++) {
-            putValue(shortPage[i]);
-          }
-          break;
-        case INT:
-          int[] intPage = input.getIntPage();
-          for (int i = 0; i < intPage.length; i++) {
-            putValue(intPage[i]);
-          }
-          break;
-        case LONG:
-          long[] longPage = input.getLongPage();
-          for (int i = 0; i < longPage.length; i++) {
-            putValue(longPage[i]);
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException(input.getDataType() +
-              " does not support RLE encoding");
+      if (dataType == DataTypes.BYTE) {
+        byte[] bytePage = input.getBytePage();
+        for (int i = 0; i < bytePage.length; i++) {
+          putValue(bytePage[i]);
+        }
+      } else if (dataType == DataTypes.SHORT) {
+        short[] shortPage = input.getShortPage();
+        for (int i = 0; i < shortPage.length; i++) {
+          putValue(shortPage[i]);
+        }
+      } else if (dataType == DataTypes.INT) {
+        int[] intPage = input.getIntPage();
+        for (int i = 0; i < intPage.length; i++) {
+          putValue(intPage[i]);
+        }
+      } else if (dataType == DataTypes.LONG) {
+        long[] longPage = input.getLongPage();
+        for (int i = 0; i < longPage.length; i++) {
+          putValue(longPage[i]);
+        }
+      } else {
+        throw new UnsupportedOperationException(input.getDataType() +
+            " does not support RLE encoding");
       }
       return collectResult();
     }
@@ -200,21 +191,16 @@ public class RLECodec implements ColumnPageCodec {
     }
 
     private void writeRunValue(Object value) throws IOException {
-      switch (dataType) {
-        case BYTE:
-          stream.writeByte((byte) value);
-          break;
-        case SHORT:
-          stream.writeShort((short) value);
-          break;
-        case INT:
-          stream.writeInt((int) value);
-          break;
-        case LONG:
-          stream.writeLong((long) value);
-          break;
-        default:
-          throw new RuntimeException("internal error");
+      if (dataType == DataTypes.BYTE) {
+        stream.writeByte((byte) value);
+      } else if (dataType == DataTypes.SHORT) {
+        stream.writeShort((short) value);
+      } else if (dataType == DataTypes.INT) {
+        stream.writeInt((int) value);
+      } else if (dataType == DataTypes.LONG) {
+        stream.writeLong((long) value);
+      } else {
+        throw new RuntimeException("internal error");
       }
     }
 
@@ -307,21 +293,16 @@ public class RLECodec implements ColumnPageCodec {
       DataType dataType = columnSpec.getSchemaDataType();
       DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
       ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
-      switch (dataType) {
-        case BYTE:
-          decodeBytePage(in, resultPage);
-          break;
-        case SHORT:
-          decodeShortPage(in, resultPage);
-          break;
-        case INT:
-          decodeIntPage(in, resultPage);
-          break;
-        case LONG:
-          decodeLongPage(in, resultPage);
-          break;
-        default:
-          throw new RuntimeException("unsupported datatype:" + dataType);
+      if (dataType == DataTypes.BYTE) {
+        decodeBytePage(in, resultPage);
+      } else if (dataType == DataTypes.SHORT) {
+        decodeShortPage(in, resultPage);
+      } else if (dataType == DataTypes.INT) {
+        decodeIntPage(in, resultPage);
+      } else if (dataType == DataTypes.LONG) {
+        decodeLongPage(in, resultPage);
+      } else {
+        throw new RuntimeException("unsupported datatype:" + dataType);
       }
       return resultPage;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
index 1b26a60..20e10b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.statistics;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
 public class LVStringStatsCollector implements ColumnPageStatsCollector {
@@ -115,7 +116,7 @@ public class LVStringStatsCollector implements ColumnPageStatsCollector {
       }
 
       @Override public DataType getDataType() {
-        return DataType.STRING;
+        return DataTypes.STRING;
       }
 
       @Override public int getScale() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 9490b93..304d998 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /** statics for primitive column page */
 public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, SimpleStatsResult {
@@ -43,10 +44,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   // this is for encode flow
   public static PrimitivePageStatsCollector newInstance(DataType dataType,
       int scale, int precision) {
-    switch (dataType) {
-      default:
-        return new PrimitivePageStatsCollector(dataType, scale, precision);
-    }
+    return new PrimitivePageStatsCollector(dataType, scale, precision);
   }
 
   // this is for decode flow, create stats from encoder meta in carbondata file
@@ -54,38 +52,32 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     PrimitivePageStatsCollector instance = new PrimitivePageStatsCollector(meta.getSchemaDataType(),
         meta.getScale(), meta.getPrecision());
     // set min max from meta
-    switch (meta.getSchemaDataType()) {
-      case BYTE:
-        instance.minByte = (byte) meta.getMinValue();
-        instance.maxByte = (byte) meta.getMaxValue();
-        break;
-      case SHORT:
-        instance.minShort = (short) meta.getMinValue();
-        instance.maxShort = (short) meta.getMaxValue();
-        break;
-      case INT:
-        instance.minInt = (int) meta.getMinValue();
-        instance.maxInt = (int) meta.getMaxValue();
-        break;
-      case LONG:
-        instance.minLong = (long) meta.getMinValue();
-        instance.maxLong = (long) meta.getMaxValue();
-        break;
-      case DOUBLE:
-        instance.minDouble = (double) meta.getMinValue();
-        instance.maxDouble = (double) meta.getMaxValue();
-        instance.decimal = meta.getDecimal();
-        break;
-      case DECIMAL:
-        instance.minDecimal = (BigDecimal) meta.getMinValue();
-        instance.maxDecimal = (BigDecimal) meta.getMaxValue();
-        instance.decimal = meta.getDecimal();
-        instance.scale = meta.getScale();
-        instance.precision = meta.getPrecision();
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "unsupported data type for stats collection: " + meta.getSchemaDataType());
+    DataType dataType = meta.getSchemaDataType();
+    if (dataType == DataTypes.BYTE) {
+      instance.minByte = (byte) meta.getMinValue();
+      instance.maxByte = (byte) meta.getMaxValue();
+    } else if (dataType == DataTypes.SHORT) {
+      instance.minShort = (short) meta.getMinValue();
+      instance.maxShort = (short) meta.getMaxValue();
+    } else if (dataType == DataTypes.INT) {
+      instance.minInt = (int) meta.getMinValue();
+      instance.maxInt = (int) meta.getMaxValue();
+    } else if (dataType == DataTypes.LONG) {
+      instance.minLong = (long) meta.getMinValue();
+      instance.maxLong = (long) meta.getMaxValue();
+    } else if (dataType == DataTypes.DOUBLE) {
+      instance.minDouble = (double) meta.getMinValue();
+      instance.maxDouble = (double) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
+    } else if (dataType == DataTypes.DECIMAL) {
+      instance.minDecimal = (BigDecimal) meta.getMinValue();
+      instance.maxDecimal = (BigDecimal) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
+      instance.scale = meta.getScale();
+      instance.precision = meta.getPrecision();
+    } else {
+      throw new UnsupportedOperationException(
+          "unsupported data type for stats collection: " + meta.getSchemaDataType());
     }
     return instance;
   }
@@ -94,112 +86,90 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     PrimitivePageStatsCollector instance =
         new PrimitivePageStatsCollector(DataType.getDataType(meta.getType()), -1, -1);
     // set min max from meta
-    switch (DataType.getDataType(meta.getType())) {
-      case BYTE:
-        instance.minByte = (byte) meta.getMinValue();
-        instance.maxByte = (byte) meta.getMaxValue();
-        break;
-      case SHORT:
-        instance.minShort = (short) meta.getMinValue();
-        instance.maxShort = (short) meta.getMaxValue();
-        break;
-      case INT:
-        instance.minInt = (int) meta.getMinValue();
-        instance.maxInt = (int) meta.getMaxValue();
-        break;
-      case LEGACY_LONG:
-      case LONG:
-        instance.minLong = (long) meta.getMinValue();
-        instance.maxLong = (long) meta.getMaxValue();
-        break;
-      case DOUBLE:
-        instance.minDouble = (double) meta.getMinValue();
-        instance.maxDouble = (double) meta.getMaxValue();
-        instance.decimal = meta.getDecimal();
-        break;
-      case DECIMAL:
-        instance.minDecimal = (BigDecimal) meta.getMinValue();
-        instance.maxDecimal = (BigDecimal) meta.getMaxValue();
-        instance.decimal = meta.getDecimal();
-        instance.scale = -1;
-        instance.precision = -1;
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "unsupported data type for Stats collection: " + meta.getType());
+    DataType dataType = DataType.getDataType(meta.getType());
+    if (dataType == DataTypes.BYTE) {
+      instance.minByte = (byte) meta.getMinValue();
+      instance.maxByte = (byte) meta.getMaxValue();
+    } else if (dataType == DataTypes.SHORT) {
+      instance.minShort = (short) meta.getMinValue();
+      instance.maxShort = (short) meta.getMaxValue();
+    } else if (dataType == DataTypes.INT) {
+      instance.minInt = (int) meta.getMinValue();
+      instance.maxInt = (int) meta.getMaxValue();
+    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) {
+      instance.minLong = (long) meta.getMinValue();
+      instance.maxLong = (long) meta.getMaxValue();
+    } else if (dataType == DataTypes.DOUBLE) {
+      instance.minDouble = (double) meta.getMinValue();
+      instance.maxDouble = (double) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
+    } else if (dataType == DataTypes.DECIMAL) {
+      instance.minDecimal = (BigDecimal) meta.getMinValue();
+      instance.maxDecimal = (BigDecimal) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
+      instance.scale = -1;
+      instance.precision = -1;
+    } else {
+      throw new UnsupportedOperationException(
+          "unsupported data type for Stats collection: " + meta.getType());
     }
     return instance;
   }
 
   private PrimitivePageStatsCollector(DataType dataType, int scale, int precision) {
     this.dataType = dataType;
-    switch (dataType) {
-      case BYTE:
-        minByte = Byte.MAX_VALUE;
-        maxByte = Byte.MIN_VALUE;
-        break;
-      case SHORT:
-        minShort = Short.MAX_VALUE;
-        maxShort = Short.MIN_VALUE;
-        break;
-      case INT:
-        minInt = Integer.MAX_VALUE;
-        maxInt = Integer.MIN_VALUE;
-        break;
-      case LEGACY_LONG:
-      case LONG:
-        minLong = Long.MAX_VALUE;
-        maxLong = Long.MIN_VALUE;
-        break;
-      case DOUBLE:
-        minDouble = Double.POSITIVE_INFINITY;
-        maxDouble = Double.NEGATIVE_INFINITY;
-        decimal = 0;
-        break;
-      case DECIMAL:
-        this.zeroDecimal = BigDecimal.ZERO;
-        decimal = scale;
-        this.scale = scale;
-        this.precision = precision;
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "unsupported data type for Stats collection: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      minByte = Byte.MAX_VALUE;
+      maxByte = Byte.MIN_VALUE;
+    } else if (dataType == DataTypes.SHORT) {
+      minShort = Short.MAX_VALUE;
+      maxShort = Short.MIN_VALUE;
+    } else if (dataType == DataTypes.INT) {
+      minInt = Integer.MAX_VALUE;
+      maxInt = Integer.MIN_VALUE;
+    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) {
+      minLong = Long.MAX_VALUE;
+      maxLong = Long.MIN_VALUE;
+    } else if (dataType == DataTypes.DOUBLE) {
+      minDouble = Double.POSITIVE_INFINITY;
+      maxDouble = Double.NEGATIVE_INFINITY;
+      decimal = 0;
+    } else if (dataType == DataTypes.DECIMAL) {
+      this.zeroDecimal = BigDecimal.ZERO;
+      decimal = scale;
+      this.scale = scale;
+      this.precision = precision;
+    } else {
+      throw new UnsupportedOperationException(
+          "unsupported data type for Stats collection: " + dataType);
     }
   }
 
   @Override
   public void updateNull(int rowId) {
     long value = 0;
-    switch (dataType) {
-      case BYTE:
-        update((byte) value);
-        break;
-      case SHORT:
-        update((short) value);
-        break;
-      case INT:
-        update((int) value);
-        break;
-      case LONG:
-        update(value);
-        break;
-      case DOUBLE:
-        update(0d);
-        break;
-      case DECIMAL:
-        if (isFirst) {
-          maxDecimal = zeroDecimal;
-          minDecimal = zeroDecimal;
-          isFirst = false;
-        } else {
-          maxDecimal = (maxDecimal.compareTo(zeroDecimal) > 0) ? maxDecimal : zeroDecimal;
-          minDecimal = (minDecimal.compareTo(zeroDecimal) < 0) ? minDecimal : zeroDecimal;
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "unsupported data type for Stats collection: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      update((byte) value);
+    } else if (dataType == DataTypes.SHORT) {
+      update((short) value);
+    } else if (dataType == DataTypes.INT) {
+      update((int) value);
+    } else if (dataType == DataTypes.LONG) {
+      update(value);
+    } else if (dataType == DataTypes.DOUBLE) {
+      update(0d);
+    } else if (dataType == DataTypes.DECIMAL) {
+      if (isFirst) {
+        maxDecimal = zeroDecimal;
+        minDecimal = zeroDecimal;
+        isFirst = false;
+      } else {
+        maxDecimal = (maxDecimal.compareTo(zeroDecimal) > 0) ? maxDecimal : zeroDecimal;
+        minDecimal = (minDecimal.compareTo(zeroDecimal) < 0) ? minDecimal : zeroDecimal;
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          "unsupported data type for Stats collection: " + dataType);
     }
   }
 
@@ -300,55 +270,52 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   @Override
   public String toString() {
-    switch (dataType) {
-      case BYTE:
-        return String.format("min: %s, max: %s, decimal: %s ", minByte, maxByte, decimal);
-      case SHORT:
-        return String.format("min: %s, max: %s, decimal: %s ", minShort, maxShort, decimal);
-      case INT:
-        return String.format("min: %s, max: %s, decimal: %s ", minInt, maxInt, decimal);
-      case LONG:
-        return String.format("min: %s, max: %s, decimal: %s ", minLong, maxLong, decimal);
-      case DOUBLE:
-        return String.format("min: %s, max: %s, decimal: %s ", minDouble, maxDouble, decimal);
+    if (dataType == DataTypes.BYTE) {
+      return String.format("min: %s, max: %s, decimal: %s ", minByte, maxByte, decimal);
+    } else if (dataType == DataTypes.SHORT) {
+      return String.format("min: %s, max: %s, decimal: %s ", minShort, maxShort, decimal);
+    } else if (dataType == DataTypes.INT) {
+      return String.format("min: %s, max: %s, decimal: %s ", minInt, maxInt, decimal);
+    } else if (dataType == DataTypes.LONG) {
+      return String.format("min: %s, max: %s, decimal: %s ", minLong, maxLong, decimal);
+    } else if (dataType == DataTypes.DOUBLE) {
+      return String.format("min: %s, max: %s, decimal: %s ", minDouble, maxDouble, decimal);
     }
     return super.toString();
   }
 
   @Override
   public Object getMin() {
-    switch (dataType) {
-      case BYTE:
-        return minByte;
-      case SHORT:
-        return minShort;
-      case INT:
-        return minInt;
-      case LONG:
-        return minLong;
-      case DOUBLE:
-        return minDouble;
-      case DECIMAL:
-        return minDecimal;
+    if (dataType == DataTypes.BYTE) {
+      return minByte;
+    } else if (dataType == DataTypes.SHORT) {
+      return minShort;
+    } else if (dataType == DataTypes.INT) {
+      return minInt;
+    } else if (dataType == DataTypes.LONG) {
+      return minLong;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return minDouble;
+    } else if (dataType == DataTypes.DECIMAL) {
+      return minDecimal;
     }
     return null;
   }
 
   @Override
   public Object getMax() {
-    switch (dataType) {
-      case BYTE:
-        return maxByte;
-      case SHORT:
-        return maxShort;
-      case INT:
-        return maxInt;
-      case LONG:
-        return maxLong;
-      case DOUBLE:
-        return maxDouble;
-      case DECIMAL:
-        return maxDecimal;
+    if (dataType == DataTypes.BYTE) {
+      return maxByte;
+    } else if (dataType == DataTypes.SHORT) {
+      return maxShort;
+    } else if (dataType == DataTypes.INT) {
+      return maxInt;
+    } else if (dataType == DataTypes.LONG) {
+      return maxLong;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return maxDouble;
+    } else if (dataType == DataTypes.DECIMAL) {
+      return maxDecimal;
     }
     return null;
   }
@@ -363,11 +330,13 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     return dataType;
   }
 
-  @Override public int getScale() {
+  @Override
+  public int getScale() {
     return scale;
   }
 
-  @Override public int getPrecision() {
+  @Override
+  public int getPrecision() {
     return precision;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 5d17426..bf1678a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -22,6 +22,8 @@ import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
@@ -102,52 +104,45 @@ public class UnsafeMemoryDMStore {
   private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
     switch (schema.getSchemaType()) {
       case FIXED:
-        switch (schema.getDataType()) {
-          case BYTE:
-            getUnsafe()
-                .putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getByte(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case SHORT:
-            getUnsafe()
-                .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getShort(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case INT:
-            getUnsafe()
-                .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getInt(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case LONG:
-            getUnsafe()
-                .putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getLong(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case FLOAT:
-            getUnsafe()
-                .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getFloat(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case DOUBLE:
-            getUnsafe()
-                .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                    row.getDouble(index));
-            runningLength += row.getSizeInBytes(index);
-            break;
-          case BYTE_ARRAY:
-            byte[] data = row.getByteArray(index);
-            getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
-                memoryBlock.getBaseOffset() + runningLength, data.length);
-            runningLength += row.getSizeInBytes(index);
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                "unsupported data type for unsafe storage: " + schema.getDataType());
+        DataType dataType = schema.getDataType();
+        if (dataType == DataTypes.BYTE) {
+          getUnsafe()
+              .putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getByte(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.SHORT) {
+          getUnsafe()
+              .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getShort(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.INT) {
+          getUnsafe()
+              .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getInt(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.LONG) {
+          getUnsafe()
+              .putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getLong(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.FLOAT) {
+          getUnsafe()
+              .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getFloat(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.DOUBLE) {
+          getUnsafe()
+              .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                  row.getDouble(index));
+          runningLength += row.getSizeInBytes(index);
+        } else if (dataType == DataTypes.BYTE_ARRAY) {
+          byte[] data = row.getByteArray(index);
+          getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + runningLength, data.length);
+          runningLength += row.getSizeInBytes(index);
+        } else {
+          throw new UnsupportedOperationException(
+              "unsupported data type for unsafe storage: " + schema.getDataType());
         }
         break;
       case VARIABLE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 66d07dc..0d7bb71 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -51,6 +51,7 @@ import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
@@ -187,30 +188,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
       ByteBuffer buffer = ByteBuffer.allocate(8);
       for (int i = 0; i < measures.size(); i++) {
         buffer.rewind();
-        switch (measures.get(i).getDataType()) {
-          case BYTE:
-            buffer.putLong(Byte.MIN_VALUE);
-            updatedValues[minValues.length + i] = buffer.array().clone();
-            break;
-          case SHORT:
-            buffer.putLong(Short.MIN_VALUE);
-            updatedValues[minValues.length + i] = buffer.array().clone();
-            break;
-          case INT:
-            buffer.putLong(Integer.MIN_VALUE);
-            updatedValues[minValues.length + i] = buffer.array().clone();
-            break;
-          case LONG:
-            buffer.putLong(Long.MIN_VALUE);
-            updatedValues[minValues.length + i] = buffer.array().clone();
-            break;
-          case DECIMAL:
-            updatedValues[minValues.length + i] =
-                DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
-            break;
-          default:
-            buffer.putDouble(Double.MIN_VALUE);
-            updatedValues[minValues.length + i] = buffer.array().clone();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.DECIMAL) {
+          updatedValues[minValues.length + i] =
+              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
+        } else {
+          buffer.putDouble(Double.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
         }
       }
     }
@@ -230,30 +226,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
       ByteBuffer buffer = ByteBuffer.allocate(8);
       for (int i = 0; i < measures.size(); i++) {
         buffer.rewind();
-        switch (measures.get(i).getDataType()) {
-          case BYTE:
-            buffer.putLong(Byte.MAX_VALUE);
-            updatedValues[maxValues.length + i] = buffer.array().clone();
-            break;
-          case SHORT:
-            buffer.putLong(Short.MAX_VALUE);
-            updatedValues[maxValues.length + i] = buffer.array().clone();
-            break;
-          case INT:
-            buffer.putLong(Integer.MAX_VALUE);
-            updatedValues[maxValues.length + i] = buffer.array().clone();
-            break;
-          case LONG:
-            buffer.putLong(Long.MAX_VALUE);
-            updatedValues[maxValues.length + i] = buffer.array().clone();
-            break;
-          case DECIMAL:
-            updatedValues[maxValues.length + i] =
-                DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
-            break;
-          default:
-            buffer.putDouble(Double.MAX_VALUE);
-            updatedValues[maxValues.length + i] = buffer.array().clone();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.DECIMAL) {
+          updatedValues[maxValues.length + i] =
+              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
+        } else {
+          buffer.putDouble(Double.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
         }
       }
     }
@@ -276,39 +267,39 @@ public class BlockletDataMap implements DataMap, Cacheable {
     List<DataMapSchema> indexSchemas = new ArrayList<>();
 
     // Index key
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     // do it 2 times, one for min and one for max.
     for (int k = 0; k < 2; k++) {
       DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
       for (int i = 0; i < minMaxLen.length; i++) {
         if (minMaxLen[i] <= 0) {
-          mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+          mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY);
         } else {
-          mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+          mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
         }
       }
-      DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+      DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataTypes.STRUCT, mapSchemas);
       indexSchemas.add(mapSchema);
     }
 
     // for number of rows.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.INT));
 
     // for table block path
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
 
     // for number of pages.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
 
     // for version number.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
 
     // for schema updated time.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.LONG));
 
     //for blocklet info
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
 
     unsafeMemoryDMStore =
         new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 32d15d3..bc55e74 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -17,7 +17,7 @@
 package org.apache.carbondata.core.indexstore.row;
 
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /**
  * Data map row.
@@ -44,7 +44,7 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setByteArray(byte[] byteArray, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+    assert (schemas[ordinal].getDataType() == DataTypes.BYTE_ARRAY);
     data[ordinal] = byteArray;
   }
 
@@ -53,12 +53,12 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setInt(int value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.INT);
+    assert (schemas[ordinal].getDataType() == DataTypes.INT);
     data[ordinal] = value;
   }
 
   @Override public void setByte(byte value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.BYTE);
+    assert (schemas[ordinal].getDataType() == DataTypes.BYTE);
     data[ordinal] = value;
   }
 
@@ -67,7 +67,7 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setShort(short value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.SHORT);
+    assert (schemas[ordinal].getDataType() == DataTypes.SHORT);
     data[ordinal] = value;
   }
 
@@ -76,7 +76,7 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setLong(long value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.LONG);
+    assert (schemas[ordinal].getDataType() == DataTypes.LONG);
     data[ordinal] = value;
   }
 
@@ -85,7 +85,7 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setFloat(float value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+    assert (schemas[ordinal].getDataType() == DataTypes.FLOAT);
     data[ordinal] = value;
   }
 
@@ -94,12 +94,12 @@ public class DataMapRowImpl extends DataMapRow {
   }
 
   @Override public void setDouble(double value, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+    assert (schemas[ordinal].getDataType() == DataTypes.DOUBLE);
     data[ordinal] = value;
   }
 
   @Override public void setRow(DataMapRow row, int ordinal) {
-    assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+    assert (schemas[ordinal].getDataType() == DataTypes.STRUCT);
     data[ordinal] = row;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
index e20ebb2..9ce6748 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.keygenerator.directdictionary;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
@@ -42,15 +43,10 @@ public final class DirectDictionaryKeyGeneratorFactory {
   public static DirectDictionaryGenerator getDirectDictionaryGenerator(DataType dataType,
       String dateFormat) {
     DirectDictionaryGenerator directDictionaryGenerator = null;
-    switch (dataType) {
-      case DATE:
-        directDictionaryGenerator = new DateDirectDictionaryGenerator(dateFormat);
-        break;
-      case TIMESTAMP:
-        directDictionaryGenerator = new TimeStampDirectDictionaryGenerator(dateFormat);
-        break;
-      default:
-
+    if (dataType == DataTypes.DATE) {
+      directDictionaryGenerator = new DateDirectDictionaryGenerator(dateFormat);
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      directDictionaryGenerator = new TimeStampDirectDictionaryGenerator(dateFormat);
     }
     return directDictionaryGenerator;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index 5a6e03d..cd32e97 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /**
  * The class provides the method to generate dictionary key and getting the actual value from
@@ -53,11 +53,6 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     initialize();
   }
 
-  public DateDirectDictionaryGenerator() {
-    this(CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
-  }
-
   /**
    * The method take member String as input and converts
    * and returns the dictionary key
@@ -159,6 +154,6 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
   }
 
   @Override public DataType getReturnType() {
-    return DataType.INT;
+    return DataTypes.INT;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index e0f5d41..d218e99 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -25,16 +25,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp
-    .TimeStampGranularityConstants.TIME_GRAN_DAY;
-import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp
-    .TimeStampGranularityConstants.TIME_GRAN_HOUR;
-import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp
-    .TimeStampGranularityConstants.TIME_GRAN_MIN;
-import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp
-    .TimeStampGranularityConstants.TIME_GRAN_SEC;
+import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_DAY;
+import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_HOUR;
+import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_MIN;
+import static org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_SEC;
 
 /**
  * The class provides the method to generate dictionary key and getting the actual value from
@@ -226,7 +223,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
   }
 
   @Override public DataType getReturnType() {
-    return DataType.LONG;
+    return DataTypes.LONG;
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index 5862933..2203b3b 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -17,11 +17,11 @@
 
 package org.apache.carbondata.core.memory;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
 
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package