You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/12 13:57:13 UTC

[4/7] carbondata git commit: [CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
new file mode 100644
index 0000000..024c341
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+// Represent a complex column page, e.g. Array, Struct type column
+public class ComplexColumnPage extends ColumnPage {
+
+  // Holds data for all rows in this page in columnar layout.
+  // After the complex data expand, it is of type byte[][], the first level array in the byte[][]
+  // representing a sub-column in the complex type, which can be retrieved by giving the depth
+  // of the complex type.
+  // TODO: further optimize it to make it more memory efficient
+  private List<ArrayList<byte[]>> complexColumnData;
+
+  // depth is the number of column after complex type is expanded. It is from 1 to N
+  private final int depth;
+
+  public ComplexColumnPage(int pageSize, int depth) {
+    super(DataType.BYTE_ARRAY, pageSize);
+    this.depth = depth;
+    complexColumnData = new ArrayList<>(depth);
+    for (int i = 0; i < depth; i++) {
+      complexColumnData.add(new ArrayList<byte[]>(pageSize));
+    }
+  }
+
+  public void putComplexData(int rowId, int depth, List<byte[]> value) {
+    assert (depth <= this.depth);
+    ArrayList<byte[]> subColumnPage = complexColumnData.get(depth);
+    subColumnPage.addAll(value);
+  }
+
+  // iterate on the sub-column after complex type is expanded, return columnar page of
+  // each sub-column
+  public Iterator<byte[][]> iterator() {
+
+    return new CarbonIterator<byte[][]>() {
+      private int index = 0;
+      @Override public boolean hasNext() {
+        return index < depth;
+      }
+
+      @Override public byte[][] next() {
+        // convert the subColumnPage from ArrayList<byte[]> to byte[][]
+        ArrayList<byte[]> subColumnPage = complexColumnData.get(index);
+        index++;
+        return subColumnPage.toArray(new byte[subColumnPage.size()][]);
+      }
+    };
+  }
+
+  public int getDepth() {
+    return depth;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
new file mode 100644
index 0000000..a56563e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+// Represent a columnar data in one page for one column.
+public class FixLengthColumnPage extends ColumnPage {
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private double[] doubleData;
+
+  private byte[][] byteArrayData;
+
+  // The index of the rowId whose value is null, will be set to 1
+  private BitSet nullBitSet;
+
+  public FixLengthColumnPage(DataType dataType, int pageSize) {
+    super(dataType, pageSize);
+    nullBitSet = new BitSet(pageSize);
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        longData = new long[pageSize];
+        break;
+      case DOUBLE:
+        doubleData = new double[pageSize];
+        break;
+      case DECIMAL:
+        byteArrayData = new byte[pageSize][];
+        break;
+      default:
+        throw new RuntimeException("Unsupported data dataType: " + dataType);
+    }
+  }
+
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  private void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  private void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  private void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  private void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  private void putDouble(int rowId, double value) {
+    doubleData[rowId] = value;
+  }
+
+  // This method will do LV (length value) coded of input bytes
+  private void putDecimalBytes(int rowId, byte[] decimalInBytes) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
+        CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    byteBuffer.putInt(decimalInBytes.length);
+    byteBuffer.put(decimalInBytes);
+    byteBuffer.flip();
+    byteArrayData[rowId] = byteBuffer.array();
+  }
+
+  public void putData(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+      return;
+    }
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        putLong(rowId, (long) value);
+        break;
+      case DOUBLE:
+        putDouble(rowId, (double) value);
+        break;
+      case DECIMAL:
+        putDecimalBytes(rowId, (byte[]) value);
+        break;
+      default:
+        throw new RuntimeException("unsupported data type: " + dataType);
+    }
+    updateStatistics(value);
+  }
+
+  private void putNull(int rowId) {
+    nullBitSet.set(rowId);
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        putLong(rowId, 0L);
+        break;
+      case DOUBLE:
+        putDouble(rowId, 0.0);
+        break;
+      case DECIMAL:
+        byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
+        putDecimalBytes(rowId, decimalInBytes);
+        break;
+    }
+  }
+
+  public long[] getLongPage() {
+    return longData;
+  }
+
+  public double[] getDoublePage() {
+    return doubleData;
+  }
+
+  public byte[][] getDecimalPage() {
+    return byteArrayData;
+  }
+
+  public BitSet getNullBitSet() {
+    return nullBitSet;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
new file mode 100644
index 0000000..d5e9ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+// Represent a variable length columnar data in one page, e.g. for dictionary columns.
+public class VarLengthColumnPage extends ColumnPage {
+
+  // TODO: further optimizite it, to store length and data separately
+  private byte[][] byteArrayData;
+
+  public VarLengthColumnPage(int pageSize) {
+    super(DataType.BYTE_ARRAY, pageSize);
+    byteArrayData = new byte[pageSize][];
+  }
+
+  public void putByteArray(int rowId, byte[] value) {
+    byteArrayData[rowId] = value;
+    updateStatistics(value);
+  }
+
+  public byte[][] getByteArrayPage() {
+    return byteArrayData;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
new file mode 100644
index 0000000..c954a33
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.compression;
+
+public interface Compression {
+  byte[] compress(byte[] input);
+  byte[] decompress(byte[] input);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
new file mode 100644
index 0000000..e870ad6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ *  Codec for a column page data, implementation should not keep state across pages,
+ *  caller will use the same object to encode multiple pages.
+ */
+public interface ColumnCodec {
+
+  /** Codec name will be stored in BlockletHeader (DataChunk3) */
+  String getName();
+
+  byte[] encode(ColumnPage columnPage);
+
+  ColumnPage decode(byte[] encoded);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
new file mode 100644
index 0000000..0dd23c7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+public class DummyCodec implements ColumnCodec {
+  @Override
+  public String getName() {
+    return "DummyCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage columnPage) {
+    return null;
+  }
+
+  @Override
+  public ColumnPage decode(byte[] encoded) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
new file mode 100644
index 0000000..3ecf1da
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/** statics for one column page */
+public class PageStatistics {
+  private DataType dataType;
+
+  /** min and max value of the measures */
+  private Object min, max;
+
+  /**
+   * the unique value is the non-exist value in the row,
+   * and will be used as storage key for null values of measures
+   */
+  private Object uniqueValue;
+
+  /** decimal count of the measures */
+  private int decimal;
+
+  public PageStatistics(DataType dataType) {
+    this.dataType = dataType;
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = Long.MIN_VALUE;
+        min = Long.MAX_VALUE;
+        uniqueValue = Long.MIN_VALUE;
+        break;
+      case DOUBLE:
+        max = Double.MIN_VALUE;
+        min = Double.MAX_VALUE;
+        uniqueValue = Double.MIN_VALUE;
+        break;
+      case DECIMAL:
+        max = new BigDecimal(Double.MIN_VALUE);
+        min = new BigDecimal(Double.MAX_VALUE);
+        uniqueValue = new BigDecimal(Double.MIN_VALUE);
+        break;
+    }
+    decimal = 0;
+  }
+
+  /**
+   * update the statistics for the input row
+   */
+  public void update(Object value) {
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = ((long) max > (long) value) ? max : value;
+        min = ((long) min < (long) value) ? min : value;
+        uniqueValue = (long) min - 1;
+        break;
+      case DOUBLE:
+        max = ((double) max > (double) value) ? max : value;
+        min = ((double) min < (double) value) ? min : value;
+        int num = getDecimalCount((double) value);
+        decimal = decimal > num ? decimal : num;
+        uniqueValue = (double) min - 1;
+        break;
+      case DECIMAL:
+        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
+        decimal = decimalValue.scale();
+        BigDecimal val = (BigDecimal) min;
+        uniqueValue = (val.subtract(new BigDecimal(1.0)));
+        break;
+      case ARRAY:
+      case STRUCT:
+        // for complex type column, writer is not going to use stats, so, do nothing
+    }
+  }
+
+  /**
+   * return no of digit after decimal
+   */
+  private int getDecimalCount(double value) {
+    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
+    int integerPlaces = strValue.indexOf('.');
+    int decimalPlaces = 0;
+    if (-1 != integerPlaces) {
+      decimalPlaces = strValue.length() - integerPlaces - 1;
+    }
+    return decimalPlaces;
+  }
+
+  public Object getMin() {
+    return min;
+  }
+
+  public Object getMax() {
+    return max;
+  }
+
+  public Object getUniqueValue() {
+    return uniqueValue;
+  }
+
+  public int getDecimal() {
+    return decimal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
new file mode 100644
index 0000000..f8b336c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Calculate the statistics for a column page and blocklet
+ */
+public interface StatisticsCollector {
+
+  /**
+   * name will be stored in Header
+   */
+  String getName();
+
+  void startPage(int pageID);
+
+  void endPage(int pageID);
+
+  void startBlocklet(int blockletID);
+
+  void endBlocklet(int blockletID);
+
+  void startBlock(int blocklID);
+
+  void endBlock(int blockID);
+
+  /**
+   * Update the stats for the input batch
+   */
+  void update(ColumnPage batch);
+
+  /**
+   * Ouput will be written to DataChunk2 (page header)
+   */
+  byte[] getPageStatistisc();
+
+  /**
+   * Output will be written to DataChunk3 (blocklet header)
+   */
+  byte[] getBlockletStatistics();
+
+  /**
+   * Output will be written to Footer
+   */
+  byte[] getBlockStatistics();
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
index 4a9007c..741b999 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
@@ -19,6 +19,9 @@ package org.apache.carbondata.core.metadata;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
 /**
  * DO NOT MODIFY THIS CLASS AND PACKAGE NAME, BECAUSE
  * IT IS SERIALIZE TO STORE
@@ -78,7 +81,20 @@ public class ValueEncoderMeta implements Serializable {
     this.decimal = decimal;
   }
 
-  public char getType() {
+  public DataType getType() {
+    switch (type) {
+      case CarbonCommonConstants.BIG_INT_MEASURE:
+        return DataType.LONG;
+      case CarbonCommonConstants.DOUBLE_MEASURE:
+        return DataType.DOUBLE;
+      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+        return DataType.DECIMAL;
+      default:
+        throw new RuntimeException("Unexpected type: " + type);
+    }
+  }
+
+  public char getTypeInChar() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index d77406c..da13d5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -32,7 +32,11 @@ public enum DataType {
   DECIMAL(8, "DECIMAL"),
   ARRAY(9, "ARRAY"),
   STRUCT(10, "STRUCT"),
-  MAP(11, "MAP");
+  MAP(11, "MAP"),
+  BYTE(12, "BYTE"),
+
+  // internal use only
+  BYTE_ARRAY(13, "BYTE ARRAY");
 
   private int precedenceOrder;
   private String name ;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index f4ab982..caba75f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -39,6 +39,8 @@ public final class ByteUtil {
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();
 
+  public static final byte[] ZERO_IN_BYTES = toBytes(0);
+
   private ByteUtil() {
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 6398f30..6fe38e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -554,7 +554,7 @@ public class CarbonMetadataUtil {
     Object[] minValue = new Object[encoderMetas.length];
     int[] decimalLength = new int[encoderMetas.length];
     Object[] uniqueValue = new Object[encoderMetas.length];
-    char[] aggType = new char[encoderMetas.length];
+    DataType[] aggType = new DataType[encoderMetas.length];
     byte[] dataTypeSelected = new byte[encoderMetas.length];
     for (int i = 0; i < encoderMetas.length; i++) {
       maxValue[i] = encoderMetas[i].getMaxValue();
@@ -827,25 +827,29 @@ public class CarbonMetadataUtil {
 
   public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
     ByteBuffer buffer = null;
-    if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) {
-      buffer = ByteBuffer.allocate(
-          (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-              + 3);
-      buffer.putChar(valueEncoderMeta.getType());
-      buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
-      buffer.putDouble((Double) valueEncoderMeta.getMinValue());
-      buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
-    } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
-      buffer = ByteBuffer.allocate(
-          (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-              + 3);
-      buffer.putChar(valueEncoderMeta.getType());
-      buffer.putLong((Long) valueEncoderMeta.getMaxValue());
-      buffer.putLong((Long) valueEncoderMeta.getMinValue());
-      buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
-    } else {
-      buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
-      buffer.putChar(valueEncoderMeta.getType());
+    switch (valueEncoderMeta.getType()) {
+      case LONG:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+        buffer.putLong((Long) valueEncoderMeta.getMinValue());
+        buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+        break;
+      case DOUBLE:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+        buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+        buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+        break;
+      case DECIMAL:
+        buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        break;
     }
     buffer.putInt(valueEncoderMeta.getDecimal());
     buffer.put(valueEncoderMeta.getDataTypeSelected());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 92c85a1..496adff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -749,5 +749,4 @@ public final class CarbonProperties {
     }
     return numberOfDeltaFilesThreshold;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 99463de..8e4df1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -833,7 +833,7 @@ public final class CarbonUtil {
     Object[] minValue = new Object[encodeMetaList.size()];
     Object[] uniqueValue = new Object[encodeMetaList.size()];
     int[] decimal = new int[encodeMetaList.size()];
-    char[] type = new char[encodeMetaList.size()];
+    DataType[] type = new DataType[encodeMetaList.size()];
     byte[] dataTypeSelected = new byte[encodeMetaList.size()];
 
     /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
index d931af6..732d053 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
@@ -16,8 +16,8 @@
  */
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 
 /**
@@ -37,7 +37,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
 
   private PRIORITY priority;
 
-  private char measureStoreType;
+  private DataType measureStoreType;
 
   /**
    * CompressionFinder constructor.
@@ -47,7 +47,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
    * @param convertedDataType
    */
   CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
-      DataType convertedDataType, char measureStoreType) {
+      DataType convertedDataType, DataType measureStoreType) {
     super();
     this.compType = compType;
     this.actualDataType = actualDataType;
@@ -65,7 +65,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
    */
 
   CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType, DataType convertedDataType,
-      PRIORITY priority, char measureStoreType) {
+      PRIORITY priority, DataType measureStoreType) {
     super();
     this.actualDataType = actualDataType;
     this.convertedDataType = convertedDataType;
@@ -155,7 +155,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
     return priority;
   }
 
-  public char getMeasureStoreType() {
+  public DataType getMeasureStoreType() {
     return measureStoreType;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 80c9e72..e33d198 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -156,9 +156,6 @@ public final class DataTypeUtil {
     }
   }
 
-  // bytes of 0 in BigDecimal
-  public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0));
-
   /**
    * This method will convert a big decimal value to bytes
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index 69ed9f8..a37a9a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -57,16 +57,6 @@ public class NodeHolder {
   private int[] keyLengths;
 
   /**
-   * dataAfterCompression
-   */
-  private short[][] dataAfterCompression;
-
-  /**
-   * indexMap
-   */
-  private short[][] indexMap;
-
-  /**
    * keyIndexBlockLenght
    */
   private int[] keyBlockIndexLength;
@@ -86,11 +76,6 @@ public class NodeHolder {
   private int[] dataIndexMapLength;
 
   /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapOffsets;
-
-  /**
    * compressedDataIndex
    */
   private byte[][] compressedDataIndex;
@@ -120,19 +105,9 @@ public class NodeHolder {
   private boolean[] aggBlocks;
 
   /**
-   * all columns max value
-   */
-  private byte[][] allMaxValue;
-
-  /**
-   * all column max value
-   */
-  private byte[][] allMinValue;
-
-  /**
    * true if given index is colgroup block
    */
-  private boolean[] colGrpBlock;
+  private boolean[] colGrpBlocks;
 
   /**
    * bit set which will holds the measure
@@ -383,14 +358,14 @@ public class NodeHolder {
    * @return
    */
   public boolean[] getColGrpBlocks() {
-    return this.colGrpBlock;
+    return this.colGrpBlocks;
   }
 
   /**
    * @param colGrpBlock true if block is column group
    */
   public void setColGrpBlocks(boolean[] colGrpBlock) {
-    this.colGrpBlock = colGrpBlock;
+    this.colGrpBlocks = colGrpBlock;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
index c8a9397..5020acb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -28,10 +28,29 @@ import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
 import org.apache.carbondata.core.datastore.compression.ReaderCompressModel;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.compression.decimal.*;
-import org.apache.carbondata.core.datastore.compression.nondecimal.*;
-import org.apache.carbondata.core.datastore.compression.none.*;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressByteArray;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinByte;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinDefault;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinInt;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinLong;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinShort;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalByte;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalDefault;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalInt;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalLong;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinByte;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinDefault;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinInt;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinLong;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinShort;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalShort;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneByte;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneDefault;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneInt;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneLong;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneShort;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public final class ValueCompressionUtil {
 
@@ -47,29 +66,28 @@ public final class ValueCompressionUtil {
    * @see
    */
   private static DataType getDataType(double value, int mantissa, byte dataTypeSelected) {
-    DataType dataType = DataType.DATA_DOUBLE;
+    DataType dataType = DataType.DOUBLE;
     if (mantissa == 0) {
       if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-        dataType = DataType.DATA_BYTE;
+        dataType = DataType.BYTE;
       } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-        dataType = DataType.DATA_SHORT;
+        dataType = DataType.SHORT;
       } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-        dataType = DataType.DATA_INT;
+        dataType = DataType.INT;
       } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
-        dataType = DataType.DATA_LONG;
+        dataType = DataType.LONG;
       }
     } else {
       if (dataTypeSelected == 1) {
         if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
           float floatValue = (float) value;
           if (floatValue - value != 0) {
-            dataType = DataType.DATA_DOUBLE;
-
+            dataType = DataType.DOUBLE;
           } else {
-            dataType = DataType.DATA_FLOAT;
+            dataType = DataType.FLOAT;
           }
         } else if (value <= Double.MAX_VALUE && value >= Double.MIN_VALUE) {
-          dataType = DataType.DATA_DOUBLE;
+          dataType = DataType.DOUBLE;
         }
       }
     }
@@ -84,14 +102,14 @@ public final class ValueCompressionUtil {
    * @see
    */
   public static int getSize(DataType dataType) {
-
     switch (dataType) {
-      case DATA_BYTE:
+      case BOOLEAN:
+      case BYTE:
         return 1;
-      case DATA_SHORT:
+      case SHORT:
         return 2;
-      case DATA_INT:
-      case DATA_FLOAT:
+      case INT:
+      case FLOAT:
         return 4;
       default:
         return 8;
@@ -110,19 +128,17 @@ public final class ValueCompressionUtil {
    * @see
    */
   public static CompressionFinder getCompressionFinder(Object maxValue, Object minValue,
-      int mantissa, char measureStoreType, byte dataTypeSelected) {
-    // ''l' for long, 'n' for double
+      int mantissa, DataType measureStoreType, byte dataTypeSelected) {
     switch (measureStoreType) {
-      case 'b':
-        return new CompressionFinder(COMPRESSION_TYPE.BIGDECIMAL, DataType.DATA_BYTE,
-            DataType.DATA_BYTE, measureStoreType);
-      case 'd':
+      case DECIMAL:
+        return new CompressionFinder(COMPRESSION_TYPE.BIGDECIMAL, DataType.BYTE,
+            DataType.BYTE, measureStoreType);
+      case SHORT:
+      case INT:
+      case LONG:
         return getLongCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
             measureStoreType);
-      case 'l':
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE,
-            DataType.DATA_BIGINT, DataType.DATA_BIGINT, measureStoreType);
-      case 'n':
+      case DOUBLE:
         return getDoubleCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
             measureStoreType);
       default:
@@ -131,7 +147,7 @@ public final class ValueCompressionUtil {
   }
 
   private static CompressionFinder getDoubleCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected, char measureStoreType) {
+      int mantissa, byte dataTypeSelected, DataType measureStoreType) {
     //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
     //but we can't use -1 to getDatatype, we should use -10000000.
     double absMaxValue = Math.abs((double) maxValue) >= Math.abs((double) minValue) ?
@@ -145,13 +161,13 @@ public final class ValueCompressionUtil {
       int adaptiveSize = getSize(adaptiveDataType);
       int deltaSize = getSize(deltaDataType);
       if (adaptiveSize > deltaSize) {
-        return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DOUBLE,
             deltaDataType, measureStoreType);
       } else if (adaptiveSize < deltaSize) {
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DOUBLE,
             deltaDataType, measureStoreType);
       } else {
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DOUBLE,
             adaptiveDataType, measureStoreType);
       }
     } else {
@@ -178,7 +194,7 @@ public final class ValueCompressionUtil {
   }
 
   private static CompressionFinder getLongCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected, char measureStoreType) {
+      int mantissa, byte dataTypeSelected, DataType measureStoreType) {
     DataType adaptiveDataType = getDataType((long) maxValue, mantissa, dataTypeSelected);
     int adaptiveSize = getSize(adaptiveDataType);
     DataType deltaDataType = null;
@@ -186,20 +202,20 @@ public final class ValueCompressionUtil {
     // consider the scenario when max and min value are equal to is long max and min value OR
     // when the max and min value are resulting in a value greater than long max value, then
     // it is not possible to determine the compression type.
-    if (adaptiveDataType == DataType.DATA_LONG) {
-      deltaDataType = DataType.DATA_BIGINT;
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
     } else {
       deltaDataType = getDataType((long) maxValue - (long) minValue, mantissa, dataTypeSelected);
     }
     int deltaSize = getSize(deltaDataType);
     if (adaptiveSize > deltaSize) {
-      return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.LONG,
           deltaDataType, measureStoreType);
     } else if (adaptiveSize < deltaSize) {
-      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.LONG,
           deltaDataType, measureStoreType);
     } else {
-      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.LONG,
           adaptiveDataType, measureStoreType);
     }
   }
@@ -234,7 +250,9 @@ public final class ValueCompressionUtil {
    */
   public static ValueCompressor getValueCompressor(CompressionFinder compressorFinder) {
     switch (compressorFinder.getMeasureStoreType()) {
-      case 'd':
+      case SHORT:
+      case INT:
+      case LONG:
         return new BigIntCompressor();
       default:
         return new DoubleCompressor();
@@ -295,7 +313,7 @@ public final class ValueCompressionUtil {
   private static Object compressNone(DataType changedDataType, double[] value) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
 
         for (double a : value) {
@@ -304,7 +322,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
 
         for (double a : value) {
@@ -313,7 +331,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
 
         for (double a : value) {
@@ -322,8 +340,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         long[] longResult = new long[value.length];
 
         for (double a : value) {
@@ -332,7 +349,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
 
         for (double a : value) {
@@ -353,7 +370,7 @@ public final class ValueCompressionUtil {
   private static Object compressMaxMin(DataType changedDataType, double[] value, double maxValue) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
 
         byte[] result = new byte[value.length];
         for (double a : value) {
@@ -362,7 +379,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
 
         short[] shortResult = new short[value.length];
 
@@ -372,7 +389,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -382,7 +399,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -392,7 +409,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -422,7 +439,7 @@ public final class ValueCompressionUtil {
   private static Object compressNonDecimal(DataType changedDataType, double[] value, int mantissa) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
 
         for (double a : value) {
@@ -430,7 +447,7 @@ public final class ValueCompressionUtil {
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
 
         for (double a : value) {
@@ -438,7 +455,7 @@ public final class ValueCompressionUtil {
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -448,7 +465,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -458,7 +475,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -489,7 +506,7 @@ public final class ValueCompressionUtil {
     int i = 0;
     BigDecimal max = BigDecimal.valueOf(maxValue);
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
 
         byte[] result = new byte[value.length];
 
@@ -501,7 +518,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
 
         short[] shortResult = new short[value.length];
 
@@ -513,7 +530,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -525,7 +542,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -537,7 +554,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -570,14 +587,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNone(DataType compDataType,
       DataType actualDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNoneByte(actualDataType);
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNoneShort(actualDataType);
-      case DATA_INT:
+      case INT:
         return new CompressionNoneInt(actualDataType);
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         return new CompressionNoneLong(actualDataType);
       default:
         return new CompressionNoneDefault(actualDataType);
@@ -590,13 +606,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionDecimalMaxMin(
       DataType compDataType, DataType actualDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionMaxMinByte(actualDataType);
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionMaxMinShort(actualDataType);
-      case DATA_INT:
+      case INT:
         return new CompressionMaxMinInt(actualDataType);
-      case DATA_LONG:
+      case LONG:
         return new CompressionMaxMinLong(actualDataType);
       default:
         return new CompressionMaxMinDefault(actualDataType);
@@ -609,13 +625,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNonDecimal(
       DataType compDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNonDecimalByte();
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNonDecimalShort();
-      case DATA_INT:
+      case INT:
         return new CompressionNonDecimalInt();
-      case DATA_LONG:
+      case LONG:
         return new CompressionNonDecimalLong();
       default:
         return new CompressionNonDecimalDefault();
@@ -628,13 +644,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNonDecimalMaxMin(
       DataType compDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNonDecimalMaxMinByte();
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNonDecimalMaxMinShort();
-      case DATA_INT:
+      case INT:
         return new CompressionNonDecimalMaxMinInt();
-      case DATA_LONG:
+      case LONG:
         return new CompressionNonDecimalMaxMinLong();
       default:
         return new CompressionNonDecimalMaxMinDefault();
@@ -645,10 +661,10 @@ public final class ValueCompressionUtil {
    * Create Value compression model for write path
    */
   public static WriterCompressModel getWriterCompressModel(Object[] maxValue, Object[] minValue,
-      int[] mantissa, Object[] uniqueValue, char[] aggType, byte[] dataTypeSelected) {
+      int[] mantissa, Object[] uniqueValue, DataType[] dataType, byte[] dataTypeSelected) {
     MeasureMetaDataModel metaDataModel =
         new MeasureMetaDataModel(minValue, maxValue, mantissa, maxValue.length, uniqueValue,
-            aggType, dataTypeSelected);
+            dataType, dataTypeSelected);
     return getWriterCompressModel(metaDataModel);
   }
 
@@ -661,7 +677,7 @@ public final class ValueCompressionUtil {
     Object[] maxValue = measureMDMdl.getMaxValue();
     Object[] uniqueValue = measureMDMdl.getUniqueValue();
     int[] mantissa = measureMDMdl.getMantissa();
-    char[] type = measureMDMdl.getType();
+    DataType[] type = measureMDMdl.getType();
     byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
     WriterCompressModel compressionModel = new WriterCompressModel();
     DataType[] actualType = new DataType[measureCount];
@@ -772,20 +788,4 @@ public final class ValueCompressionUtil {
      */
     BIGDECIMAL
   }
-
-  /**
-   * use to identify the type of data.
-   */
-  public enum DataType {
-    DATA_BYTE(),
-    DATA_SHORT(),
-    DATA_INT(),
-    DATA_FLOAT(),
-    DATA_LONG(),
-    DATA_BIGINT(),
-    DATA_DOUBLE(),
-    DATA_BIGDECIMAL();
-    DataType() {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 2c6c890..ddcc8a4 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -28,6 +28,8 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.format.*;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ColumnSchema;
+import org.apache.carbondata.format.DataType;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -169,7 +171,12 @@ public class CarbonMetadataUtilTest {
     long[] longArr = { 1, 2, 3, 4, 5 };
     byte[][] maxByteArr = { { 1, 2 }, { 3, 4 }, { 5, 6 }, { 2, 4 }, { 1, 2 } };
     int[] cardinality = { 1, 2, 3, 4, 5 };
-    char[] charArr = { 'a', 's', 'd', 'g', 'h' };
+    org.apache.carbondata.core.metadata.datatype.DataType[] dataType = {
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT };
 
     org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema =
         new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema();
@@ -191,7 +198,7 @@ public class CarbonMetadataUtilTest {
     writerCompressModel.setMinValue(objMinArr);
     writerCompressModel.setDataTypeSelected(byteArr);
     writerCompressModel.setMantissa(intArr);
-    writerCompressModel.setType(charArr);
+    writerCompressModel.setType(dataType);
     writerCompressModel.setUniqueValue(objMinArr);
 
     BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
index 6252ca1..3032085 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.decimal.*;
 import org.apache.carbondata.core.datastore.compression.nondecimal.*;
 import org.apache.carbondata.core.datastore.compression.none.*;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 import org.junit.Test;
 
@@ -36,8 +36,8 @@ public class ValueCompressionUtilTest {
 
   @Test public void testGetSize() {
     DataType[] dataTypes =
-        { DataType.DATA_BIGINT, DataType.DATA_INT, DataType.DATA_BYTE, DataType.DATA_SHORT,
-            DataType.DATA_FLOAT };
+        { DataType.LONG, DataType.INT, DataType.BOOLEAN, DataType.SHORT,
+            DataType.FLOAT };
     int[] expectedSizes = { 8, 4, 1, 2, 4 };
     for (int i = 0; i < dataTypes.length; i++) {
       assertEquals(expectedSizes[i], ValueCompressionUtil.getSize(dataTypes[i]));
@@ -48,7 +48,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 25, 12, 22 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_INT, 22, 0);
+            DataType.INT, 22, 0);
     int[] expectedResult = { -3, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -59,7 +59,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_BYTE, 22, 0);
+            DataType.BYTE, 22, 0);
     byte[] expectedResult = { 2, 1, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -70,7 +70,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 200, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_SHORT, 22, 0);
+            DataType.SHORT, 22, 0);
     short[] expectedResult = { -178, 1, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -81,7 +81,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 2000, 2100, 2002 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_LONG, 2125, 0);
+            DataType.LONG, 2125, 0);
     long[] expectedResult = { 125, 25, 123 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -92,7 +92,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_FLOAT, 22.345, 3);
+            DataType.FLOAT, 22.345, 3);
     float[] expectedResult = { 2.224f, 1.122f, 0f };
     for (int i = 0; i < result.length; i++) {
     	assertTrue(result[i]-expectedResult[i]==0);
@@ -103,7 +103,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_DOUBLE, 102.345, 3);
+            DataType.DOUBLE, 102.345, 3);
     double[] expectedResult = { 82.224, 81.122, 80.0 };
     for (int i = 0; i < result.length; i++) {
       assertTrue(result[i]-expectedResult[i]==0);
@@ -114,7 +114,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_BIGINT, 22, 0);
+            DataType.LONG, 22, 0);
     long[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -124,7 +124,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataByte() {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_BYTE,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.BYTE,
             22, 0);
     byte[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -136,7 +136,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 200000, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_SHORT, 22, 0);
+            DataType.SHORT, 22, 0);
     short[] expectedResult = { 3392, 21, 22 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -146,7 +146,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataInt() {
     double[] values = { 20, 21, 22 };
     int[] result = (int[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_INT,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.INT,
             22, 0);
     int[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -157,7 +157,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataLong() {
     double[] values = { 20, 21, 22 };
     long[] result = (long[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_LONG,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.LONG,
             22, 0);
     long[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -169,7 +169,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_FLOAT, 22, 3);
+            DataType.FLOAT, 22, 3);
     float[] expectedResult = { 20.121f, 21.223f, 22.345f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],3);
@@ -180,7 +180,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_DOUBLE, 22, 3);
+            DataType.DOUBLE, 22, 3);
     double[] expectedResult = { 20.121, 21.223, 22.345 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],3);
@@ -191,7 +191,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_FLOAT, 22, 1);
+            DataType.FLOAT, 22, 1);
     float[] expectedResult = { 201f, 212f, 223f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -202,7 +202,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_BYTE, 22, 1);
+            DataType.BYTE, 22, 1);
     byte[] expectedResult = { -55, -44, -33 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -213,7 +213,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_SHORT, 22, 1);
+            DataType.SHORT, 22, 1);
     short[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -224,7 +224,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_INT, 22, 1);
+            DataType.INT, 22, 1);
     int[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -235,7 +235,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_LONG, 22, 1);
+            DataType.LONG, 22, 1);
     long[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -246,7 +246,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_DOUBLE, 22, 1);
+            DataType.DOUBLE, 22, 1);
     double[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -257,7 +257,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_BYTE, 22, 1);
+            DataType.BYTE, 22, 1);
     byte[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -268,7 +268,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_INT, 22, 1);
+            DataType.INT, 22, 1);
     int[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -279,7 +279,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_DOUBLE, 22, 1);
+            DataType.DOUBLE, 22, 1);
     double[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -290,7 +290,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20000, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_SHORT, 22, 1);
+            DataType.SHORT, 22, 1);
     short[] expectedResult = { -3172, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -301,7 +301,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_LONG, 22, 1);
+            DataType.LONG, 22, 1);
     long[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -312,7 +312,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_FLOAT, 22, 1);
+            DataType.FLOAT, 22, 1);
     float[] expectedResult = { 20f, 10f, 0f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -321,127 +321,127 @@ public class ValueCompressionUtilTest {
 
   @Test public void testToUnCompressNone() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_BIGINT, DataType.DATA_BIGINT);
+        ValueCompressionUtil.getCompressionNone(DataType.LONG, DataType.LONG);
     assertEquals(result.getClass(), CompressionNoneLong.class);
   }
 
   @Test public void testToUnCompressNoneForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_BYTE, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.BYTE, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneByte.class);
   }
 
   @Test public void testToUnCompressNoneForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_LONG, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.LONG, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneLong.class);
   }
 
   @Test public void testToUnCompressNoneForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_SHORT, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.SHORT, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneShort.class);
   }
 
   @Test public void testToUnCompressNoneForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_INT, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.INT, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneInt.class);
   }
 
   @Test public void testToUnCompressNoneForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_DOUBLE, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.DOUBLE, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneDefault.class);
   }
 
   @Test public void testToUnCompressMaxMinForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_DOUBLE, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DOUBLE, null);
     assertEquals(result.getClass(), CompressionMaxMinDefault.class);
   }
 
   @Test public void testToUnCompressMaxMinForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_INT, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.INT, null);
     assertEquals(result.getClass(), CompressionMaxMinInt.class);
   }
 
   @Test public void testToUnCompressMaxMinForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_LONG, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.LONG, null);
     assertEquals(result.getClass(), CompressionMaxMinLong.class);
   }
 
   @Test public void testToUnCompressMaxMinForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_BYTE, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.BYTE, null);
     assertEquals(result.getClass(), CompressionMaxMinByte.class);
   }
 
   @Test public void testToUnCompressMaxMinForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_SHORT, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.SHORT, null);
     assertEquals(result.getClass(), CompressionMaxMinShort.class);
   }
 
   @Test public void testToUnCompressNonDecimalForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_DOUBLE);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.DOUBLE);
     assertEquals(result.getClass(), CompressionNonDecimalDefault.class);
   }
 
   @Test public void testToUnCompressNonDecimalForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_INT);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.INT);
     assertEquals(result.getClass(), CompressionNonDecimalInt.class);
   }
 
   @Test public void testToUnCompressNonDecimalForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_LONG);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.LONG);
     assertEquals(result.getClass(), CompressionNonDecimalLong.class);
   }
 
   @Test public void testToUnCompressNonDecimalForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_BYTE);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.BYTE);
     assertEquals(result.getClass(), CompressionNonDecimalByte.class);
   }
 
   @Test public void testToUnCompressNonDecimalForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_SHORT);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.SHORT);
     assertEquals(result.getClass(), CompressionNonDecimalShort.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_DOUBLE);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DOUBLE);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinDefault.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_INT);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.INT);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinInt.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_LONG);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.LONG);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinLong.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_BYTE);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.BYTE);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinByte.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_SHORT);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.SHORT);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinShort.class);
   }
 
@@ -490,7 +490,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1L, 2L, 3L };
     int[] decimalLength = { 0, 0, 0 };
     Object[] uniqueValues = { 5, new Long[]{2L,4L}, 2L};
-    char[] types = { 'l', 'l', 'l' };
+    DataType[] types = { DataType.LONG, DataType.LONG, DataType.LONG };
     byte[] dataTypeSelected = { 1, 2, 4 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 3, uniqueValues, types,
@@ -510,7 +510,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -526,7 +526,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32500.00 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -542,7 +542,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1111078433.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -558,7 +558,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE};
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -574,7 +574,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -590,7 +590,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 0 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -606,7 +606,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
index d02e25f..480ed04 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
@@ -184,7 +184,7 @@ public class CarbonFooterWriterTest extends TestCase{
     compressionModel.setMaxValue(new Object[] { 44d, 55d });
     compressionModel.setMinValue(new Object[] { 0d, 0d });
     compressionModel.setMantissa(new int[] { 0, 0 });
-    compressionModel.setType(new char[] { 'n', 'n' });
+    compressionModel.setType(new DataType[] { DataType.DOUBLE, DataType.DOUBLE });
     compressionModel.setUniqueValue(new Object[] { 0d, 0d });
     compressionModel.setDataTypeSelected(new byte[2]);
     infoColumnar.setCompressionModel(compressionModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 371b9bb..9ae01b8 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -43,6 +43,7 @@ object CarbonSessionExample {
       .master("local")
       .appName("CarbonSessionExample")
       .config("spark.sql.warehouse.dir", warehouse)
+      .config("spark.driver.host", "localhost")
       .getOrCreateCarbonSession(storeLocation, metastoredb)
 
     spark.sparkContext.setLogLevel("WARN")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 937108c..b4cbc4e 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -114,8 +114,12 @@ struct DataChunk{
 }
 
 /**
- * Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
- * For V2 format.
+ * Represents the metadata of a data chunk.
+ * The chunk can be a single column stored in Column Major format or a group of columns stored
+ * in Row Major format.
+ *
+ * For V3, one data chunk is one page data of 32K rows.
+ * For V2 & V1, one data chunk is one blocklet data.
  */
 struct DataChunk2{
     1: required ChunkCompressionMeta chunk_meta; // The metadata of a chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
index f31d434..5d6c07a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
@@ -28,7 +28,7 @@ class TestEmptyRows extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
     sql("drop table if exists emptyRowCarbonTable")
     sql("drop table if exists emptyRowHiveTable")
-    //eid,ename,sal,presal,comm,deptno,Desc
+
     sql(
       "create table if not exists emptyRowCarbonTable (eid int,ename String,sal decimal,presal " +
         "decimal,comm decimal" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
index 3d85814..fa7b970 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 class TestLoadDataWithNoMeasure extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-    sql("DROP TABLE IF EXISTS nomeasureTest_sd")
+    sql("DROP TABLE IF EXISTS nomeasureTest")
     sql(
       "CREATE TABLE nomeasureTest (empno String, doj String) STORED BY 'org.apache.carbondata" +
         ".format'"
@@ -106,8 +106,8 @@ class TestLoadDataWithNoMeasure extends QueryTest with BeforeAndAfterAll {
   }
 
   override def afterAll {
-    sql("drop table nomeasureTest")
-    sql("drop table nomeasureTest_sd")
-    sql("drop table nomeasureTest_scd")
+    sql("drop table if exists nomeasureTest")
+    sql("drop table if exists nomeasureTest_sd")
+    sql("drop table if exists nomeasureTest_scd")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
index f1c1d69..ab003c0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
@@ -27,6 +27,12 @@ import org.scalatest.BeforeAndAfterAll
 class ColumnGroupDataTypesTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop table if exists colgrp")
+    sql("drop table if exists normal")
+    sql("drop table if exists colgrp_dictexclude_before")
+    sql("drop table if exists colgrp_dictexclude_after")
+    sql("drop table if exists colgrp_disorder")
+
     sql("create table colgrp (column1 string,column2 string,column3 string,column4 string,column5 string,column6 string,column7 string,column8 string,column9 string,column10 string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES (\"COLUMN_GROUPS\"=\"(column2,column3,column4),(column7,column8,column9)\")")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/10dim_4msr.csv' INTO table colgrp options('FILEHEADER'='column1,column2,column3,column4,column5,column6,column7,column8,column9,column10,measure1,measure2,measure3,measure4')");
     sql("create table normal (column1 string,column2 string,column3 string,column4 string,column5 string,column6 string,column7 string,column8 string,column9 string,column10 string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format'")