You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/11/21 06:49:54 UTC

[1/3] carbondata git commit: [CARBONDATA-3112] Optimise decompressing while filling the vector during conversion of primitive typess

Repository: carbondata
Updated Branches:
  refs/heads/master 51b10ba70 -> bed51ba77


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index b4dd1b1..16763d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -303,6 +303,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     // applying the filter in spark's side. So we should disable vectorPushRowFilters option
     // in case of filters on global dictionary.
     val hasDictionaryFilterCols = hasFilterOnDictionaryColumn(filterSet, table)
+
+    // In case of more dictionary columns spark code gen needs generate lot of code and that slows
+    // down the query, so we limit the direct fill in case of more dictionary columns.
+    val hasMoreDictionaryCols = hasMoreDictionaryColumnsOnProjection(projectSet, table)
     val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
@@ -342,7 +346,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
-          && !hasDictionaryFilterCols) {
+          && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
         // Here carbon only do page pruning and row level pruning will be done by spark.
         scan.inputRDDs().head match {
           case rdd: CarbonScanRDD[InternalRow] =>
@@ -386,7 +390,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
 
       var updateRequestedColumns =
-        if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols) {
+        if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols
+            && !hasMoreDictionaryCols) {
           updateRequestedColumnsFunc(
             (projectSet ++ filterSet).map(relation.attributeMap).toSeq,
             table,
@@ -398,7 +403,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         supportBatchedDataSource(relation.relation.sqlContext,
           updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
         needDecoder.isEmpty
-      if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols) {
+      if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols
+          && !hasMoreDictionaryCols) {
         // revert for row scan
         updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       }
@@ -414,7 +420,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
-          && !implictsExisted && !hasDictionaryFilterCols) {
+          && !implictsExisted && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
         // Here carbon only do page pruning and row level pruning will be done by spark.
         scan.inputRDDs().head match {
           case rdd: CarbonScanRDD[InternalRow] =>
@@ -518,6 +524,18 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     filterColumns.exists(c => map.get(c.name).getOrElse(false))
   }
 
+  private def hasMoreDictionaryColumnsOnProjection(projectColumns: AttributeSet,
+      relation: CarbonDatasourceHadoopRelation): Boolean = {
+    val map = relation.carbonRelation.metaData.dictionaryMap
+    var count = 0
+    projectColumns.foreach{c =>
+      if (map.get(c.name).getOrElse(false)) {
+        count += 1
+      }
+    }
+    count > CarbonCommonConstants.CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT
+  }
+
   private def getPartitioning(carbonTable: CarbonTable,
       output: Seq[Attribute]): Partitioning = {
     val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)


[3/3] carbondata git commit: [CARBONDATA-3112] Optimise decompressing while filling the vector during conversion of primitive typess

Posted by ma...@apache.org.
[CARBONDATA-3112] Optimise decompressing while filling the vector during conversion of primitive typess

Following optimizations done in the PR.

1. Optimise decompressing while filling the vector during conversion of primitive types. It avoids creating an intermediate buffer while decompression.
2. Refactor the global dictionary decoder codegen to minimise the amount of code generated to reduce the time.
3. Disable lazy load for full scan queries as it is unnecessary.
4. Refactor the compressor interface and created Abstract class. All primitive datatype conversions now happen in little_endian as snappy does
that conversion while compressing. So it might break the compatibility for ZSTD for the last version.

This closes #2863


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bed51ba7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bed51ba7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bed51ba7

Branch: refs/heads/master
Commit: bed51ba772cf0e8c5c648f620b62d2c9ba4ef9e8
Parents: 51b10ba
Author: ravipesala <ra...@gmail.com>
Authored: Fri Oct 26 20:50:53 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Nov 21 12:23:57 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 ...mpressedDimensionChunkFileBasedReaderV3.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |   2 +-
 .../safe/AbstractNonDictionaryVectorFiller.java |  47 +++--
 ...feVariableLengthDimensionDataChunkStore.java |   2 +-
 .../compression/AbstractCompressor.java         | 123 ++++++++++++
 .../datastore/compression/SnappyCompressor.java |   4 +-
 .../datastore/compression/ZstdCompressor.java   |  95 +--------
 .../page/ColumnPageValueConverter.java          |   6 +-
 .../datastore/page/VarLengthColumnPageBase.java |   2 +-
 .../page/encoding/ColumnPageDecoder.java        |   2 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  74 ++++---
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 164 ++++++++-------
 .../adaptive/AdaptiveFloatingCodec.java         |  73 +++----
 .../adaptive/AdaptiveIntegralCodec.java         | 137 +++++++------
 .../encoding/compress/DirectCompressCodec.java  | 146 ++++++++------
 .../datastore/page/encoding/rle/RLECodec.java   |   2 +-
 .../statistics/PrimitivePageStatsCollector.java |   7 +
 .../page/statistics/StatisticsCollector.java    |  66 ------
 .../datatype/DecimalConverterFactory.java       |  53 +++--
 .../scan/result/vector/CarbonColumnVector.java  |   4 +
 .../scan/result/vector/CarbonDictionary.java    |   2 +
 .../vector/impl/CarbonColumnVectorImpl.java     |  35 +++-
 .../vector/impl/CarbonDictionaryImpl.java       |  37 ++++
 .../AbstractCarbonColumnarVector.java           |  10 +
 ...umnarVectorWrapperDirectWithDeleteDelta.java |  10 +-
 ...erDirectWithDeleteDeltaAndInvertedIndex.java |  34 +++-
 ...narVectorWrapperDirectWithInvertedIndex.java |   9 +-
 .../apache/carbondata/core/util/ByteUtil.java   |  28 ++-
 .../presto/CarbonColumnVectorWrapper.java       |   9 +
 .../src/test/resources/IUD/negativevalue.csv    |   7 +
 .../iud/UpdateCarbonTableTestCase.scala         |  17 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  10 +
 .../ColumnarVectorWrapperDirect.java            |   8 +
 .../VectorizedCarbonRecordReader.java           |  31 ++-
 .../datasources/SparkCarbonFileFormat.scala     |  10 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 156 ++++++++++-----
 .../org/apache/spark/sql/CarbonVectorProxy.java | 200 ++++++++++++++-----
 .../stream/CarbonStreamRecordReader.java        |   5 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 195 ++++++++++++------
 .../strategy/CarbonLateDecodeStrategy.scala     |  26 ++-
 41 files changed, 1193 insertions(+), 663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b75648e..094e552 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1949,6 +1949,12 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_WRITTEN_BY_APPNAME = "carbon.writtenby.app.name";
 
+  /**
+   * When more global dictionary columns are there then there is issue in generating codegen to them
+   * and it slows down the query.So we limit to 100 for now
+   */
+  public static final int CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT = 100;
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   //////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 9df5bc1..c85c9ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -252,7 +252,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     if (vectorInfo != null) {
       decoder
           .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
-              nullBitSet, isLocalDictEncodedPage);
+              nullBitSet, isLocalDictEncodedPage, pageMetadata.numberOfRowsInpage);
       return null;
     } else {
       return decoder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index a754cf2..2d3979a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -242,7 +242,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     if (vectorInfo != null) {
       codec
           .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
-              nullBitSet, false);
+              nullBitSet, false, pageMetadata.numberOfRowsInpage);
       return null;
     } else {
       return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index f2e91be..5e0dfdf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectWithInvertedIndex;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -88,10 +89,11 @@ class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
           CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
         vector.putNull(i);
       } else {
-        vector.putByteArray(i, localOffset, length, data);
+        vector.putArray(i, localOffset, length);
       }
       localOffset += length;
     }
+    vector.putAllByteArray(data, 0, data.length);
   }
 }
 
@@ -100,23 +102,40 @@ class LongStringVectorFiller extends AbstractNonDictionaryVectorFiller {
     super(numberOfRows);
   }
 
-  @Override
-  public void fillVector(byte[] data, CarbonColumnVector vector) {
+  @Override public void fillVector(byte[] data, CarbonColumnVector vector) {
     // start position will be used to store the current data position
+    boolean invertedIndex = vector instanceof ColumnarVectorWrapperDirectWithInvertedIndex;
     int localOffset = 0;
     ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
-    for (int i = 0; i < numberOfRows; i++) {
-      int length =
-          (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 0xFF) << 16) | (
-              (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 0xFF));
-      localOffset += 4;
-      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
-        vector.putNull(i);
-      } else {
-        vector.putByteArray(i, localOffset, length, data);
+    if (invertedIndex) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length =
+            (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 0xFF) << 16) | (
+                (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 0xFF));
+        localOffset += 4;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putByteArray(i, localOffset, length, data);
+        }
+        localOffset += length;
       }
-      localOffset += length;
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        int length =
+            (((data[localOffset] & 0xFF) << 24) | ((data[localOffset + 1] & 0xFF) << 16) | (
+                (data[localOffset + 2] & 0xFF) << 8) | (data[localOffset + 3] & 0xFF));
+        localOffset += 4;
+        if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, localOffset, length)) {
+          vector.putNull(i);
+        } else {
+          vector.putArray(i, localOffset, length);
+        }
+        localOffset += length;
+      }
+      vector.putAllByteArray(data, 0, data.length);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 01db383..3c00fd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -51,7 +51,6 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
   public SafeVariableLengthDimensionDataChunkStore(boolean isInvertedIndex, int numberOfRows) {
     super(isInvertedIndex);
     this.numberOfRows = numberOfRows;
-    this.dataOffsets = new int[numberOfRows];
   }
 
   /**
@@ -66,6 +65,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
       byte[] data) {
     // first put the data, inverted index and reverse inverted index to memory
     super.putArray(invertedIndex, invertedIndexReverse, data);
+    this.dataOffsets = new int[numberOfRows];
     // As data is of variable length and data format is
     // <length in short><data><length in short><data>
     // we need to store offset of each data so data can be accessed directly

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
new file mode 100644
index 0000000..5123cc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public abstract class AbstractCompressor implements Compressor {
+
+  @Override
+  public byte[] compressShort(short[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public short[] unCompressShort(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    ShortBuffer unCompBuffer =
+        ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer();
+    short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
+    unCompBuffer.get(shorts);
+    return shorts;
+  }
+
+  @Override
+  public byte[] compressInt(int[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public int[] unCompressInt(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    IntBuffer unCompBuffer =
+        ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
+    unCompBuffer.get(ints);
+    return ints;
+  }
+
+  @Override
+  public byte[] compressLong(long[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public long[] unCompressLong(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    LongBuffer unCompBuffer =
+        ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer();
+    long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
+    unCompBuffer.get(longs);
+    return longs;
+  }
+
+  @Override
+  public byte[] compressFloat(float[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public float[] unCompressFloat(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    FloatBuffer unCompBuffer =
+        ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer();
+    float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
+    unCompBuffer.get(floats);
+    return floats;
+  }
+
+  @Override
+  public byte[] compressDouble(double[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
+    unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public double[] unCompressDouble(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    DoubleBuffer unCompBuffer =
+        ByteBuffer.wrap(unCompArray).order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer();
+    double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
+    unCompBuffer.get(doubles);
+    return doubles;
+  }
+
+  @Override
+  public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException {
+    throw new RuntimeException("Not implemented rawCompress for " + this.getName());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index c86011c..6f4a9c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyNative;
 
-public class SnappyCompressor implements Compressor {
+public class SnappyCompressor extends AbstractCompressor {
 
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(SnappyCompressor.class.getName());
@@ -90,7 +90,7 @@ public class SnappyCompressor implements Compressor {
     try {
       uncompressedLength = Snappy.uncompressedLength(compInput, offset, length);
       data = new byte[uncompressedLength];
-      Snappy.uncompress(compInput, offset, length, data, 0);
+      snappyNative.rawUncompress(compInput, offset, length, data, 0);
     } catch (IOException e) {
       LOGGER.error(e.getMessage(), e);
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
index 914c3e7..3e6a11b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
@@ -18,18 +18,10 @@
 package org.apache.carbondata.core.datastore.compression;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.DoubleBuffer;
-import java.nio.FloatBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
-import java.nio.ShortBuffer;
-
-import org.apache.carbondata.core.util.ByteUtil;
 
 import com.github.luben.zstd.Zstd;
 
-public class ZstdCompressor implements Compressor {
+public class ZstdCompressor extends AbstractCompressor {
   private static final int COMPRESS_LEVEL = 3;
 
   public ZstdCompressor() {
@@ -65,91 +57,6 @@ public class ZstdCompressor implements Compressor {
   }
 
   @Override
-  public byte[] compressShort(short[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT);
-    unCompBuffer.asShortBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public short[] unCompressShort(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    ShortBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asShortBuffer();
-    short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
-    unCompBuffer.get(shorts);
-    return shorts;
-  }
-
-  @Override
-  public byte[] compressInt(int[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT);
-    unCompBuffer.asIntBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public int[] unCompressInt(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    IntBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asIntBuffer();
-    int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
-    unCompBuffer.get(ints);
-    return ints;
-  }
-
-  @Override
-  public byte[] compressLong(long[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG);
-    unCompBuffer.asLongBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public long[] unCompressLong(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    LongBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asLongBuffer();
-    long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
-    unCompBuffer.get(longs);
-    return longs;
-  }
-
-  @Override
-  public byte[] compressFloat(float[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT);
-    unCompBuffer.asFloatBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public float[] unCompressFloat(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    FloatBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asFloatBuffer();
-    float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
-    unCompBuffer.get(floats);
-    return floats;
-  }
-
-  @Override
-  public byte[] compressDouble(double[] unCompInput) {
-    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
-    unCompBuffer.asDoubleBuffer().put(unCompInput);
-    return compressByte(unCompBuffer.array());
-  }
-
-  @Override
-  public double[] unCompressDouble(byte[] compInput, int offset, int length) {
-    byte[] unCompArray = unCompressByte(compInput, offset, length);
-    DoubleBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asDoubleBuffer();
-    double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
-    unCompBuffer.get(doubles);
-    return doubles;
-  }
-
-  @Override
-  public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException {
-    throw new RuntimeException("Not implemented rawCompress for zstd yet");
-  }
-
-  @Override
   public long rawUncompress(byte[] input, byte[] output) throws IOException {
     return Zstd.decompress(output, input);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 82ccd22..5bc46e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,9 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 // Transformation type that can be applied to ColumnPage
@@ -37,5 +40,6 @@ public interface ColumnPageValueConverter {
   double decodeDouble(long value);
   double decodeDouble(float value);
   double decodeDouble(double value);
-  void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
+  void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+      DataType pageDataType, int pageSize);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index a760b64..81bb1b5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,7 +124,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+  public static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
       byte[] lvEncodedBytes) throws MemoryException {
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     DecimalConverterFactory.DecimalConverter decimalConverter =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index d82a873..6f36c08 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -35,7 +35,7 @@ public interface ColumnPageDecoder {
    *  Apply decoding algorithm on input byte array and fill the vector here.
    */
   void decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
-      BitSet nullBits, boolean isLVEncoded) throws MemoryException, IOException;
+      BitSet nullBits, boolean isLVEncoded, int pageSize) throws MemoryException, IOException;
 
   ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
       throws MemoryException, IOException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index f91ede5..735847e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -133,14 +132,13 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
-        page.setNullBits(nullBits);
-        if (page instanceof DecimalColumnPage) {
-          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
-        }
-        converter.decodeAndFillVector(page, vectorInfo);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, length);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
@@ -244,69 +242,67 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
+      int rowId = 0;
       if (vectorDataType == DataTypes.FLOAT) {
         float floatFactor = factor.floatValue();
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - byteData[i]) / floatFactor);
+            vector.putFloat(i, (max - pageData[i]) / floatFactor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - shortData[i]) / floatFactor);
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector
+                .putFloat(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)) / floatFactor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putFloat(i, (max - shortInt) / floatFactor);
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putFloat(rowId++, (max - shortInt) / floatFactor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (max - intData[i]) / floatFactor);
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)) / floatFactor);
           }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
       } else {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - byteData[i]) / factor);
+            vector.putDouble(rowId++, (max - pageData[i]) / factor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - shortData[i]) / factor);
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)) / factor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (max - shortInt) / factor);
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putDouble(rowId++, (max - shortInt) / factor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - intData[i]) / factor);
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)) / factor);
           }
         } else if (pageDataType == DataTypes.LONG) {
-          long[] longData = columnPage.getLongPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - longData[i]) / factor);
+          int size = pageSize * DataTypes.LONG.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)) / factor);
           }
         } else {
           throw new RuntimeException("Unsupported datatype : " + pageDataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 12d108b..578945b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -24,11 +24,11 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -144,17 +144,20 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = null;
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, length);
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
-          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
-        } else {
-          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+          TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+          DecimalConverterFactory.DecimalConverter decimalConverter =
+              DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+          vectorInfo.decimalConverter = decimalConverter;
         }
-        page.setNullBits(nullBits);
-        converter.decodeAndFillVector(page, vectorInfo);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override
@@ -300,17 +303,15 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
       DataType vectorDataType = vector.getType();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
-      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
@@ -321,165 +322,180 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       }
     }
 
-    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+    private void fillVector(byte[] pageData, CarbonColumnVector vector,
         DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
       int newScale = 0;
       if (vectorInfo.measure != null) {
         newScale = vectorInfo.measure.getMeasure().getScale();
       }
+      int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-        byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) (max - byteData[i]));
+            vector.putShort(i, (short) (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - byteData[i]));
+            vector.putInt(i, (int) (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - byteData[i]));
+            vector.putLong(i, (max - pageData[i]));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) byteData[i]) * 1000);
+            vector.putLong(i, (max - (long) pageData[i]) * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putByte(i, (byte) (max - byteData[i]));
+            vector.putByte(i, (byte) (max - pageData[i]));
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+            BigDecimal decimal = decimalConverter.getDecimal(max - pageData[i]);
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
             vector.putDecimal(i, decimal, precision);
           }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (int) (max - pageData[i]));
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - byteData[i]));
+            vector.putDouble(i, (max - pageData[i]));
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        short[] shortData = columnPage.getShortPage();
+        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
         if (vectorDataType == DataTypes.SHORT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putShort(rowId++, (short) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putInt(rowId++, (int) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
-        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) shortData[i]) * 1000);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector
+                .putLong(rowId++, (max - (long) ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - ByteUtil.toShortLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
+          }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (int) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - shortData[i]));
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         }
-
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        byte[] shortIntPage = columnPage.getShortIntPage();
+        int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putInt(i, (int) (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putInt(rowId++, (int) (max - shortInt));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putLong(rowId++, (max - shortInt));
           }
-        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putLong(i, (max - (long) shortInt) * 1000);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - (long) ByteUtil.valueOf3Bytes(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
             vector.putDecimal(i, decimal, precision);
           }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putFloat(rowId++, (int) (max - shortInt));
+          }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (max - shortInt));
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
+            vector.putDouble(rowId++, (max - shortInt));
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int[] intData = columnPage.getIntPage();
+        int size = pageSize * DataTypes.INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putInt(rowId++, (int) (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - (long) intData[i]) * 1000);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - (long) ByteUtil.toIntLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - ByteUtil.toIntLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
           }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (max - intData[i]));
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        long[] longData = columnPage.getLongPage();
+        int size = pageSize * DataTypes.LONG.getSizeInBytes();
         if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - longData[i]));
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (max - longData[i]) * 1000);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < pageSize; i++) {
-            BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            BigDecimal decimal =
+                decimalConverter.getDecimal(max - ByteUtil.toLongLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
               decimal = decimal.setScale(newScale);
             }
-            vector.putDecimal(i, decimal, precision);
+            vector.putDecimal(rowId++, decimal, precision);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index b04c9df..c66c065 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -121,14 +120,13 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
-        page.setNullBits(nullBits);
-        if (page instanceof DecimalColumnPage) {
-          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
-        }
-        converter.decodeAndFillVector(page, vectorInfo);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, length);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override
@@ -235,68 +233,63 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
+      int rowId = 0;
       if (vectorDataType == DataTypes.FLOAT) {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (byteData[i] / floatFactor));
+            vector.putFloat(i, (pageData[i] / floatFactor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (shortData[i] / floatFactor));
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, i) / floatFactor));
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putFloat(i, (shortInt / floatFactor));
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / floatFactor));
           }
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putFloat(i, (intData[i] / floatFactor));
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) / floatFactor));
           }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
       } else {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-          byte[] byteData = columnPage.getBytePage();
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (byteData[i] / factor));
+            vector.putDouble(i, (pageData[i] / factor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          short[] shortData = columnPage.getShortPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (shortData[i] / factor));
+          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toShortLittleEndian(pageData, i) / factor));
           }
-
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          byte[] shortIntPage = columnPage.getShortIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
-            vector.putDouble(i, (shortInt / factor));
+          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / factor));
           }
+
         } else if (pageDataType == DataTypes.INT) {
-          int[] intData = columnPage.getIntPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (intData[i] / factor));
+          int size = pageSize * DataTypes.INT.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) / factor));
           }
         } else if (pageDataType == DataTypes.LONG) {
-          long[] longData = columnPage.getLongPage();
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, (longData[i] / factor));
+          int size = pageSize * DataTypes.LONG.getSizeInBytes();
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putDouble(rowId++, (ByteUtil.toLongLittleEndian(pageData, i) / factor));
           }
         } else {
           throw new RuntimeException("Unsupported datatype : " + pageDataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index d77a949..d9db437 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -23,11 +23,11 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -121,17 +121,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
           throws MemoryException, IOException {
-        ColumnPage page = null;
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, length);
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
-          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
-        } else {
-          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+          TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+          DecimalConverterFactory.DecimalConverter decimalConverter =
+              DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+          vectorInfo.decimalConverter = decimalConverter;
         }
-        page.setNullBits(nullBits);
-        converter.decodeAndFillVector(page, vectorInfo);
+        converter.decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+            pageSize);
       }
 
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
@@ -273,17 +276,15 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
       DataType vectorDataType = vector.getType();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
-      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
@@ -295,123 +296,143 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
     }
 
-    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
-        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType vectorDataType,
+        DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+      int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-        byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) byteData[i]);
+            vector.putShort(i, (short) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) byteData[i]);
+            vector.putInt(i, (int) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, byteData[i]);
+            vector.putLong(i, pageData[i]);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) byteData[i] * 1000);
+            vector.putLong(i, (long) pageData[i] * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN) {
-          vector.putBytes(0, pageSize, byteData, 0);
+          vector.putBytes(0, pageSize, pageData, 0);
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, pageData[i]);
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, byteData[i]);
+            vector.putDouble(i, pageData[i]);
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        short[] shortData = columnPage.getShortPage();
+        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
         if (vectorDataType == DataTypes.SHORT) {
-          vector.putShorts(0, pageSize, shortData, 0);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
+          }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putInt(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) shortData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, ((long) ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
+          }
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         }
 
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        byte[] shortIntPage = columnPage.getShortIntPage();
         if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putInt(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, (long) shortInt * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          int[] shortIntData = ByteUtil.toIntArrayFrom3Bytes(shortIntPage, pageSize);
-          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter
+              .fillVector(pageData, pageSize, vectorInfo, nullBits, DataTypes.SHORT_INT);
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
+            vector.putFloat(i, shortInt);
+          }
         } else {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putDouble(i, shortInt);
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int[] intData = columnPage.getIntPage();
+        int size = pageSize * DataTypes.INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          vector.putInts(0, pageSize, intData, 0);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) intData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        long[] longData = columnPage.getLongPage();
+        int size = pageSize * DataTypes.LONG.getSizeInBytes();
         if (vectorDataType == DataTypes.LONG) {
-          vector.putLongs(0, pageSize, longData, 0);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, longData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         }
       } else {
-        double[] doubleData = columnPage.getDoublePage();
-        vector.putDoubles(0, pageSize, doubleData, 0);
+        int size = pageSize * DataTypes.DOUBLE.getSizeInBytes();
+        for (int i = 0; i < size; i += DataTypes.DOUBLE.getSizeInBytes()) {
+          vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
+        }
       }
     }
   };


[2/3] carbondata git commit: [CARBONDATA-3112] Optimise decompressing while filling the vector during conversion of primitive typess

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index fd94344..7b7c0b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -23,12 +23,13 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
-import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.VarLengthColumnPageBase;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -105,17 +106,32 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
       @Override
       public void decodeAndFillVector(byte[] input, int offset, int length,
-          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
           throws MemoryException, IOException {
-        ColumnPage decodedPage;
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+        byte[] unCompressData = compressor.unCompressByte(input, offset, length);
         if (DataTypes.isDecimal(dataType)) {
-          decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-          vectorInfo.decimalConverter = ((DecimalColumnPage) decodedPage).getDecimalConverter();
+          TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+          DecimalConverterFactory.DecimalConverter decimalConverter =
+              DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+          vectorInfo.decimalConverter = decimalConverter;
+          if (DataTypes.isDecimal(meta.getStoreDataType())) {
+            ColumnPage decimalColumnPage =
+                VarLengthColumnPageBase.newDecimalColumnPage(meta, unCompressData);
+            decimalConverter.fillVector(decimalColumnPage.getByteArrayPage(), pageSize, vectorInfo,
+                nullBits, meta.getStoreDataType());
+          } else {
+            converter
+                .decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+                    pageSize);
+          }
         } else {
-          decodedPage = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+          converter
+              .decodeAndFillVector(unCompressData, vectorInfo, nullBits, meta.getStoreDataType(),
+                  pageSize);
         }
-        decodedPage.setNullBits(nullBits);
-        converter.decodeAndFillVector(decodedPage, vectorInfo);
       }
 
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
@@ -203,17 +219,15 @@ public class DirectCompressCodec implements ColumnPageCodec {
     }
 
     @Override
-    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+    public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
+        DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      BitSet nullBits = columnPage.getNullBits();
       DataType vectorDataType = vector.getType();
-      DataType pageDataType = columnPage.getDataType();
-      int pageSize = columnPage.getPageSize();
       BitSet deletedRows = vectorInfo.deletedRows;
       vector = ColumnarVectorWrapperDirectFactory
           .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
               true, false);
-      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
       if (deletedRows == null || deletedRows.isEmpty()) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
           vector.putNull(i);
@@ -224,130 +238,134 @@ public class DirectCompressCodec implements ColumnPageCodec {
       }
     }
 
-    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
-        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType vectorDataType,
+        DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+      int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
-        byte[] byteData = columnPage.getBytePage();
         if (vectorDataType == DataTypes.SHORT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putShort(i, (short) byteData[i]);
+            vector.putShort(i, (short) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) byteData[i]);
+            vector.putInt(i, (int) pageData[i]);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, byteData[i]);
+            vector.putLong(i, pageData[i]);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) byteData[i] * 1000);
+            vector.putLong(i, (long) pageData[i] * 1000);
           }
         } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
-          vector.putBytes(0, pageSize, byteData, 0);
+          vector.putBytes(0, pageSize, pageData, 0);
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
           for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, byteData[i]);
+            vector.putDouble(i, pageData[i]);
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        short[] shortData = columnPage.getShortPage();
+        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
         if (vectorDataType == DataTypes.SHORT) {
-          vector.putShorts(0, pageSize, shortData, 0);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
+          }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putInt(i, (int) shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putInt(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) shortData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putLong(rowId++, (long) ByteUtil.toShortLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, shortData[i]);
+          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         }
 
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        byte[] shortIntPage = columnPage.getShortIntPage();
         if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putInt(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.LONG) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, shortInt);
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putLong(i, (long) shortInt * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          int[] shortIntData = ByteUtil.toIntArrayFrom3Bytes(shortIntPage, pageSize);
-          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
           for (int i = 0; i < pageSize; i++) {
-            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            int shortInt = ByteUtil.valueOf3Bytes(pageData, i * 3);
             vector.putDouble(i, shortInt);
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int[] intData = columnPage.getIntPage();
+        int size = pageSize * DataTypes.INT.getSizeInBytes();
         if (vectorDataType == DataTypes.INT) {
-          vector.putInts(0, pageSize, intData, 0);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, (long) intData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putDouble(i, intData[i]);
+          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+            vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         }
-      }  else if (pageDataType == DataTypes.LONG) {
-        long[] longData = columnPage.getLongPage();
+      } else if (pageDataType == DataTypes.LONG) {
+        int size = pageSize * DataTypes.LONG.getSizeInBytes();
         if (vectorDataType == DataTypes.LONG) {
-          vector.putLongs(0, pageSize, longData, 0);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
+          }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, longData[i] * 1000);
+          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         }
-      } else if (DataTypes.isDecimal(pageDataType)) {
-        DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-        decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
-            columnPage.getNullBits());
       } else if (vectorDataType == DataTypes.FLOAT) {
-        float[] floatPage = columnPage.getFloatPage();
-        vector.putFloats(0, pageSize, floatPage, 0);
+        int size = pageSize * DataTypes.FLOAT.getSizeInBytes();
+        for (int i = 0; i < size; i += DataTypes.FLOAT.getSizeInBytes()) {
+          vector.putFloat(rowId++, ByteUtil.toFloatLittleEndian(pageData, i));
+        }
       } else {
-        double[] doubleData = columnPage.getDoublePage();
-        vector.putDoubles(0, pageSize, doubleData, 0);
+        int size = pageSize * DataTypes.DOUBLE.getSizeInBytes();
+        for (int i = 0; i < size; i += DataTypes.DOUBLE.getSizeInBytes()) {
+          vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
+        }
       }
     }
   };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index c9b47db..2c940bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -318,7 +318,7 @@ public class RLECodec implements ColumnPageCodec {
 
     @Override
     public void decodeAndFillVector(byte[] input, int offset, int length,
-        ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+        ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded, int pageSize)
         throws MemoryException, IOException {
       throw new UnsupportedOperationException("Not supposed to be called here");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index e604057..babf144 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -30,6 +30,7 @@ import static org.apache.carbondata.core.datastore.page.encoding.bool.BooleanCon
 
 /** statics for primitive column page */
 public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, SimpleStatsResult {
+  private static final String ZERO_STRING = "0";
   private DataType dataType;
   private byte minByte, maxByte;
   private short minShort, maxShort;
@@ -243,6 +244,12 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       int integerPlaces = strValue.indexOf('.');
       if (-1 != integerPlaces) {
         decimalPlaces = strValue.length() - integerPlaces - 1;
+        // If decimal places are one and it is just zero then treat the decimal count a zero.
+        if (decimalPlaces == 1) {
+          if (strValue.substring(integerPlaces + 1, strValue.length()).equals(ZERO_STRING)) {
+            decimalPlaces = 0;
+          }
+        }
       }
     } catch (NumberFormatException e) {
       if (!Double.isInfinite(value)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
deleted file mode 100644
index f8b336c..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Calculate the statistics for a column page and blocklet
- */
-public interface StatisticsCollector {
-
-  /**
-   * name will be stored in Header
-   */
-  String getName();
-
-  void startPage(int pageID);
-
-  void endPage(int pageID);
-
-  void startBlocklet(int blockletID);
-
-  void endBlocklet(int blockletID);
-
-  void startBlock(int blocklID);
-
-  void endBlock(int blockID);
-
-  /**
-   * Update the stats for the input batch
-   */
-  void update(ColumnPage batch);
-
-  /**
-   * Ouput will be written to DataChunk2 (page header)
-   */
-  byte[] getPageStatistisc();
-
-  /**
-   * Output will be written to DataChunk3 (blocklet header)
-   */
-  byte[] getBlockletStatistics();
-
-  /**
-   * Output will be written to Footer
-   */
-  byte[] getBlockStatistics();
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 5231cb9..9793c38 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -23,6 +23,7 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -75,7 +76,8 @@ public final class DecimalConverterFactory {
 
     BigDecimal getDecimal(Object valueToBeConverted);
 
-    void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset);
+    void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset,
+        DataType pageType);
 
     int getSize();
 
@@ -101,14 +103,18 @@ public final class DecimalConverterFactory {
     }
 
     @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
-        BitSet nullBitset) {
+        BitSet nullBitset, DataType pageType) {
       // TODO we need to find way to directly set to vector with out conversion. This way is very
       // inefficient.
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
       int newMeasureScale = info.measure.getMeasure().getScale();
-      if (valuesToBeConverted instanceof byte[]) {
-        byte[] data = (byte[]) valuesToBeConverted;
+      if (!(valuesToBeConverted instanceof byte[])) {
+        throw new UnsupportedOperationException("This object type " + valuesToBeConverted.getClass()
+            + " is not supported in this method");
+      }
+      byte[] data = (byte[]) valuesToBeConverted;
+      if (pageType == DataTypes.BYTE) {
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
@@ -120,39 +126,56 @@ public final class DecimalConverterFactory {
             vector.putDecimal(i, value, precision);
           }
         }
-      } else if (valuesToBeConverted instanceof short[]) {
-        short[] data = (short[]) valuesToBeConverted;
+      } else if (pageType == DataTypes.SHORT) {
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            BigDecimal value = BigDecimal
+                .valueOf(ByteUtil.toShortLittleEndian(data, i * DataTypes.SHORT.getSizeInBytes()),
+                    scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
             }
             vector.putDecimal(i, value, precision);
           }
         }
-      } else if (valuesToBeConverted instanceof int[]) {
-        int[] data = (int[]) valuesToBeConverted;
+      } else if (pageType == DataTypes.SHORT_INT) {
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            BigDecimal value = BigDecimal
+                .valueOf(ByteUtil.valueOf3Bytes(data, i * DataTypes.SHORT_INT.getSizeInBytes()),
+                    scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
             }
             vector.putDecimal(i, value, precision);
           }
         }
-      } else if (valuesToBeConverted instanceof long[]) {
-        long[] data = (long[]) valuesToBeConverted;
+      } else if (pageType == DataTypes.INT) {
         for (int i = 0; i < size; i++) {
           if (nullBitset.get(i)) {
             vector.putNull(i);
           } else {
-            BigDecimal value = BigDecimal.valueOf(data[i], scale);
+            BigDecimal value = BigDecimal
+                .valueOf(ByteUtil.toIntLittleEndian(data, i * DataTypes.INT.getSizeInBytes()),
+                    scale);
+            if (value.scale() < newMeasureScale) {
+              value = value.setScale(newMeasureScale);
+            }
+            vector.putDecimal(i, value, precision);
+          }
+        }
+      } else if (pageType == DataTypes.LONG) {
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            BigDecimal value = BigDecimal
+                .valueOf(ByteUtil.toLongLittleEndian(data, i * DataTypes.LONG.getSizeInBytes()),
+                    scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
             }
@@ -239,7 +262,7 @@ public final class DecimalConverterFactory {
     }
 
     @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
-        BitSet nullBitset) {
+        BitSet nullBitset, DataType pageType) {
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
       int newMeasureScale = info.measure.getMeasure().getScale();
@@ -285,7 +308,7 @@ public final class DecimalConverterFactory {
     }
 
     @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
-        BitSet nullBitset) {
+        BitSet nullBitset, DataType pageType) {
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
       int newMeasureScale = info.measure.getMeasure().getScale();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 6b8455f..25a2eb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -62,6 +62,10 @@ public interface CarbonColumnVector {
 
   void putByteArray(int rowId, int offset, int length, byte[] value);
 
+  void putArray(int rowId, int offset, int length);
+
+  void putAllByteArray(byte[] data, int offset, int length);
+
   void putByte(int rowId, byte value);
 
   void putBytes(int rowId, int count, byte[] value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
index 882a365..84d52a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
@@ -25,4 +25,6 @@ public interface CarbonDictionary  {
   byte[] getDictionaryValue(int index);
 
   byte[][] getAllDictionaryValues();
+
+  byte[] getAllDictionaryValuesInSingleArray();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index f89ad9d..98536f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -53,6 +53,12 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
 
   private DataType blockDataType;
 
+  private int[] lengths;
+
+  private int[] offsets;
+
+  private int batchSize;
+
   /**
    * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
    * having to clear NULL bits.
@@ -64,6 +70,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   private CarbonColumnVector dictionaryVector;
 
   public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
+    this.batchSize = batchSize;
     nullBytes = new BitSet(batchSize);
     this.dataType = dataType;
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
@@ -223,8 +230,13 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       if (null != carbonDictionary) {
         int dictKey = (Integer) dictionaryVector.getData(rowId);
         return carbonDictionary.getDictionaryValue(dictKey);
+      } else if (byteArr != null) {
+        byte[] bytes = new byte[lengths[rowId]];
+        System.arraycopy(byteArr, offsets[rowId], bytes, 0, bytes.length);
+        return bytes;
+      } else {
+        return bytes[rowId];
       }
-      return bytes[rowId];
     } else {
       return data[rowId];
     }
@@ -357,4 +369,25 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   @Override public void setLazyPage(LazyPageLoader lazyPage) {
     lazyPage.loadPage();
   }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    if (offsets == null) {
+      offsets = new int[batchSize];
+      lengths = new int[batchSize];
+    }
+    offsets[rowId] = offset;
+    lengths[rowId] = length;
+  }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    byteArr = data;
+  }
+
+  public int[] getLengths() {
+    return lengths;
+  }
+
+  public int[] getOffsets() {
+    return offsets;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
index 20e6171..962a772 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
@@ -22,6 +22,12 @@ public class CarbonDictionaryImpl implements CarbonDictionary {
 
   private byte[][] dictionary;
 
+  private byte[] singleArrayDictValues;
+
+  private int[] dictLens;
+
+  private int[] dictOffsets;
+
   private int actualSize;
 
   public CarbonDictionaryImpl(byte[][] dictionary, int actualSize) {
@@ -44,4 +50,35 @@ public class CarbonDictionaryImpl implements CarbonDictionary {
   @Override public byte[][] getAllDictionaryValues() {
     return dictionary;
   }
+
+  @Override public byte[] getAllDictionaryValuesInSingleArray() {
+    if (singleArrayDictValues == null) {
+      dictLens = new int[dictionary.length];
+      dictOffsets = new int[dictionary.length];
+      int size = 0;
+      for (int i = 0; i < dictionary.length; i++) {
+        if (dictionary[i] != null) {
+          dictOffsets[i] = size;
+          size += dictionary[i].length;
+          dictLens[i] = dictionary[i].length;
+        }
+      }
+      singleArrayDictValues = new byte[size];
+      for (int i = 0; i < dictionary.length; i++) {
+        if (dictionary[i] != null) {
+          System.arraycopy(dictionary[i], 0, singleArrayDictValues, dictOffsets[i], dictLens[i]);
+        }
+      }
+      dictionary = null;
+    }
+    return singleArrayDictValues;
+  }
+
+  public int[] getDictLens() {
+    return dictLens;
+  }
+
+  public int[] getDictOffsets() {
+    return dictOffsets;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
index 7a1f317..4c783b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java
@@ -27,6 +27,12 @@ import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
 public abstract class AbstractCarbonColumnarVector
     implements CarbonColumnVector, ConvertableVector {
 
+  protected CarbonColumnVector columnVector;
+
+  public AbstractCarbonColumnarVector(CarbonColumnVector columnVector) {
+    this.columnVector = columnVector;
+  }
+
   @Override
   public void putShorts(int rowId, int count, short value) {
     throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
@@ -136,4 +142,8 @@ public abstract class AbstractCarbonColumnarVector
   public void setLazyPage(LazyPageLoader lazyPage) {
     throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
   }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    columnVector.putAllByteArray(data, offset, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
index ccde63e..67eb6ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
@@ -34,13 +34,11 @@ class ColumnarVectorWrapperDirectWithDeleteDelta extends AbstractCarbonColumnarV
 
   private int counter;
 
-  private CarbonColumnVector columnVector;
-
   public ColumnarVectorWrapperDirectWithDeleteDelta(CarbonColumnVector vectorWrapper,
       BitSet deletedRows, BitSet nullBits) {
+    super(vectorWrapper);
     this.deletedRows = deletedRows;
     this.nullBits = nullBits;
-    this.columnVector = vectorWrapper;
   }
 
   @Override
@@ -213,4 +211,10 @@ class ColumnarVectorWrapperDirectWithDeleteDelta extends AbstractCarbonColumnarV
       }
     }
   }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    if (!deletedRows.get(rowId)) {
+      columnVector.putArray(counter++, offset, length);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
index 46b2041..2dd70bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl
  * Column vector for column pages which has delete delta and inverted index, so it uses delta biset
  * to filter out data and use inverted index before filling to actual vector
  */
-class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
+public class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
     extends ColumnarVectorWrapperDirectWithInvertedIndex {
 
   private BitSet deletedRows;
@@ -78,6 +78,10 @@ class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
     }
   }
 
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    carbonColumnVector.putAllByteArray(data, offset, length);
+  }
+
   @Override
   public void convert() {
     if (columnVector instanceof CarbonColumnVectorImpl) {
@@ -163,13 +167,27 @@ class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
           }
         }
       } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
-        byte[][] dataArray = (byte[][]) localVector.getDataArray();
-        for (int i = 0; i < length; i++) {
-          if (!deletedRows.get(i)) {
-            if (nullBits.get(i)) {
-              carbonColumnVector.putNull(counter++);
-            } else {
-              carbonColumnVector.putByteArray(counter++, dataArray[i]);
+        int[] offsets = localVector.getOffsets();
+        int[] lengths = localVector.getLengths();
+        if (offsets != null && lengths != null) {
+          for (int i = 0; i < length; i++) {
+            if (!deletedRows.get(i)) {
+              if (nullBits.get(i)) {
+                carbonColumnVector.putNull(counter++);
+              } else {
+                carbonColumnVector.putArray(counter++, offsets[i], lengths[i]);
+              }
+            }
+          }
+        } else {
+          byte[][] dataArray = (byte[][]) localVector.getDataArray();
+          for (int i = 0; i < length; i++) {
+            if (!deletedRows.get(i)) {
+              if (nullBits.get(i)) {
+                carbonColumnVector.putNull(counter++);
+              } else {
+                carbonColumnVector.putByteArray(counter++, dataArray[i]);
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
index f190d7d..aa816ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
@@ -26,18 +26,17 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
  * Column vector for column pages which has inverted index, so it uses inverted index
  * before filling to actual vector
  */
-class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbonColumnarVector {
+public class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbonColumnarVector {
 
   protected int[] invertedIndex;
 
-  protected CarbonColumnVector columnVector;
 
   protected boolean isnullBitsExists;
 
   public ColumnarVectorWrapperDirectWithInvertedIndex(CarbonColumnVector columnVector,
       int[] invertedIndex, boolean isnullBitsExists) {
+    super(columnVector);
     this.invertedIndex = invertedIndex;
-    this.columnVector = columnVector;
     this.isnullBitsExists = isnullBitsExists;
   }
 
@@ -147,4 +146,8 @@ class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbonColumna
   public DataType getBlockDataType() {
     return columnVector.getBlockDataType();
   }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    columnVector.putArray(invertedIndex[rowId], offset, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 8e6e15b..fd81610 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -734,11 +734,27 @@ public final class ByteUtil {
     return Float.intBitsToFloat(toXorInt(value, offset, length));
   }
 
-  public static int[] toIntArrayFrom3Bytes(byte[] data, int size) {
-    int[] ints = new int[size];
-    for (int i = 0; i < ints.length; i++) {
-      ints[i] = ByteUtil.valueOf3Bytes(data, i * 3);
-    }
-    return ints;
+  public static int toIntLittleEndian(byte[] bytes, int offset) {
+    return (((int) bytes[offset + 3] & 0xff) << 24) + (((int) bytes[offset + 2] & 0xff) << 16) + (
+        ((int) bytes[offset + 1] & 0xff) << 8) + ((int) bytes[offset] & 0xff);
+  }
+
+  public static short toShortLittleEndian(byte[] bytes, int offset) {
+    return (short) ((((int) bytes[offset + 1] & 0xff) << 8) + ((int) bytes[offset] & 0xff));
+  }
+
+  public static double toDoubleLittleEndian(byte[] bytes, int offset) {
+    return Double.longBitsToDouble(toLongLittleEndian(bytes, offset));
+  }
+
+  public static float toFloatLittleEndian(byte[] bytes, int offset) {
+    return Float.intBitsToFloat(toIntLittleEndian(bytes, offset));
+  }
+
+  public static long toLongLittleEndian(byte[] bytes, int offset) {
+    return ((((long) bytes[offset + 7]) << 56) | (((long) bytes[offset + 6] & 0xff) << 48) | (
+        ((long) bytes[offset + 5] & 0xff) << 40) | (((long) bytes[offset + 4] & 0xff) << 32) | (
+        ((long) bytes[offset + 3] & 0xff) << 24) | (((long) bytes[offset + 2] & 0xff) << 16) | (
+        ((long) bytes[offset + 1] & 0xff) << 8) | (((long) bytes[offset] & 0xff)));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index 765643a..a80751f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -305,4 +305,13 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
     lazyPage.loadPage();
   }
 
+  @Override public void putArray(int rowId, int offset, int length) {
+    if (!filteredRows[rowId]) {
+      columnVector.putArray(counter++, offset, length);
+    }
+  }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    columnVector.putAllByteArray(data, offset, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-common-test/src/test/resources/IUD/negativevalue.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/IUD/negativevalue.csv b/integration/spark-common-test/src/test/resources/IUD/negativevalue.csv
new file mode 100644
index 0000000..71bf4e5
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/IUD/negativevalue.csv
@@ -0,0 +1,7 @@
+-30000,aaa,-300
+0,ddd,0
+-20000,bbb,-200
+70000,ggg,700
+10000,eee,100,
+-10000,ccc,-100,
+null,null,null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 50fdd0c..744a310 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -772,7 +772,22 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""drop table if exists iud.dest33_part""")
   }
 
-  override def afterAll {
+  test("check data after update with row.filter pushdown as false") {
+    sql("""drop table if exists iud.dest33_flat""")
+    sql(
+      """create table iud.dest33_part (c1 int,c2 string, c3 short) STORED BY 'carbondata'"""
+        .stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/negativevalue.csv' INTO table iud
+         |.dest33_part options('header'='false')""".stripMargin)
+    sql(
+      """update iud.dest33_part d set (c1) = (5) where d.c1 = 0""".stripMargin).show()
+    checkAnswer(sql("select c3 from iud.dest33_part"), Seq(Row(-300), Row(0), Row(-200), Row(700)
+      , Row(100), Row(-100), Row(null)))
+    sql("""drop table if exists iud.dest33_part""")
+  }
+
+    override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 22188c4..7520ef6 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -346,4 +346,14 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   @Override public void setLazyPage(LazyPageLoader lazyPage) {
     lazyPage.loadPage();
   }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    if (!filteredRows[rowId]) {
+      sparkColumnVectorProxy.putArray(rowId, offset, length);
+    }
+  }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    sparkColumnVectorProxy.putAllByteArray(data, offset, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
index 7c5902e..c50d060 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -231,4 +231,12 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
   @Override public void setLazyPage(LazyPageLoader lazyPage) {
     sparkColumnVectorProxy.setLazyPage(lazyPage);
   }
+
+  @Override public void putArray(int rowId, int offset, int length) {
+    sparkColumnVectorProxy.putArray(rowId, offset, length);
+  }
+
+  @Override public void putAllByteArray(byte[] data, int offset, int length) {
+    sparkColumnVectorProxy.putAllByteArray(data, offset, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 1bde17d..8082511 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -36,6 +36,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -282,11 +284,13 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
         schema = schema.add(field);
       }
     }
+    boolean useLazyLoad = false;
     short batchSize = DEFAULT_BATCH_SIZE;
     if (queryModel.isDirectVectorFill()) {
       batchSize = CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+      useLazyLoad = isUseLazyLoad();
     }
-    vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE, schema, batchSize);
+    vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE, schema, batchSize, useLazyLoad);
 
     if (partitionColumns != null) {
       int partitionIdx = fields.length;
@@ -318,6 +322,31 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows);
   }
 
+  /**
+   * Whether to use lazy load in vector or not.
+   * @return
+   */
+  private boolean isUseLazyLoad() {
+    boolean useLazyLoad = false;
+    if (queryModel.getFilterExpressionResolverTree() != null) {
+      Expression expression =
+          queryModel.getFilterExpressionResolverTree().getFilterExpression();
+      useLazyLoad = true;
+      // In case of join queries only not null filter would e pushed down so check and disable the
+      // lazy load in that case.
+      if (expression instanceof NotEqualsExpression) {
+        try {
+          if (((NotEqualsExpression) expression).getRight().evaluate(null).isNull()) {
+            useLazyLoad = false;
+          }
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return useLazyLoad;
+  }
+
   private void initBatch() {
     initBatch(DEFAULT_MEMORY_MODE, new StructType(), InternalRow.empty());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 8c2f200..8cb2ca4 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -117,9 +117,13 @@ class SparkCarbonFileFormat extends FileFormat
   /**
    * Add our own protocol to control the commit.
    */
-  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
-    "spark.sql.sources.commitProtocolClass",
-    "org.apache.spark.sql.carbondata.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+  SparkSession.getActiveSession match {
+    case Some(session) => session.sessionState.conf.setConfString(
+      "spark.sql.sources.commitProtocolClass",
+      "org.apache.spark.sql.carbondata.execution." +
+      "datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+    case _ =>
+  }
 
   /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation is

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index 90e2cc5..edb1020 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -31,7 +31,6 @@ import org.apache.spark.sql.types.CalendarIntervalType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -47,25 +46,6 @@ public class CarbonVectorProxy {
   private ColumnarBatch columnarBatch;
   private ColumnVectorProxy[] columnVectorProxies;
 
-  /**
-   * Adapter class which handles the columnar vector reading of the carbondata
-   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-   * handles the complexity of spark 2.3 version related api changes since
-   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-   *
-   * @param memMode       which represent the type onheap or offheap vector.
-   * @param rowNum        rows number for vector reading
-   * @param structFileds, metadata related to current schema of table.
-   */
-  public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-    columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
-    columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
-    for (int i = 0; i < columnVectorProxies.length; i++) {
-      columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
-    }
-    updateColumnVectors();
-
-  }
 
   private void updateColumnVectors() {
     try {
@@ -77,11 +57,28 @@ public class CarbonVectorProxy {
     }
   }
 
-  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+  /**
+   * Adapter class which handles the columnar vector reading of the carbondata
+   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+   * handles the complexity of spark 2.3 version related api changes since
+   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+   *
+   * @param memMode       which represent the type onheap or offheap vector.
+   * @param outputSchema, metadata related to current schema of table.
+   * @param rowNum        rows number for vector reading
+   * @param useLazyLoad   Whether to use lazy load while getting the data.
+   */
+  public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
+      boolean useLazyLoad) {
     columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
     columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
     for (int i = 0; i < columnVectorProxies.length; i++) {
-      columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
+      if (useLazyLoad) {
+        columnVectorProxies[i] =
+            new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
+      } else {
+        columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
+      }
     }
     updateColumnVectors();
   }
@@ -158,10 +155,6 @@ public class CarbonVectorProxy {
 
     private ColumnVector vector;
 
-    private LazyPageLoader pageLoad;
-
-    private boolean isLoaded;
-
     public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
       super(capacity, columnVector.dataType(), mode);
       try {
@@ -170,16 +163,14 @@ public class CarbonVectorProxy {
         childColumns.setAccessible(true);
         Object o = childColumns.get(columnVector);
         childColumns.set(this, o);
-        Field childColumns1 =
+        Field resultArray =
             columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
-        childColumns1.setAccessible(true);
-        Object o1 = childColumns1.get(columnVector);
-        childColumns1.set(this, o1);
-
+        resultArray.setAccessible(true);
+        Object o1 = resultArray.get(columnVector);
+        resultArray.set(this, o1);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-
       vector = columnVector;
     }
 
@@ -342,42 +333,34 @@ public class CarbonVectorProxy {
     }
 
     @Override public boolean isNullAt(int i) {
-      checkPageLoaded();
       return vector.isNullAt(i);
     }
 
     @Override public boolean getBoolean(int i) {
-      checkPageLoaded();
       return vector.getBoolean(i);
     }
 
     @Override public byte getByte(int i) {
-      checkPageLoaded();
       return vector.getByte(i);
     }
 
     @Override public short getShort(int i) {
-      checkPageLoaded();
       return vector.getShort(i);
     }
 
     @Override public int getInt(int i) {
-      checkPageLoaded();
       return vector.getInt(i);
     }
 
     @Override public long getLong(int i) {
-      checkPageLoaded();
       return vector.getLong(i);
     }
 
     @Override public float getFloat(int i) {
-      checkPageLoaded();
       return vector.getFloat(i);
     }
 
     @Override public double getDouble(int i) {
-      checkPageLoaded();
       return vector.getDouble(i);
     }
 
@@ -433,12 +416,10 @@ public class CarbonVectorProxy {
     }
 
     @Override public int getArrayLength(int rowId) {
-      checkPageLoaded();
       return vector.getArrayLength(rowId);
     }
 
     @Override public int getArrayOffset(int rowId) {
-      checkPageLoaded();
       return vector.getArrayOffset(rowId);
     }
 
@@ -450,10 +431,98 @@ public class CarbonVectorProxy {
       return vector.putByteArray(rowId, value, offset, count);
     }
 
+    /**
+     * It keeps all binary data of all rows to it.
+     * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
+     * and offset.
+     */
+    public void putAllByteArray(byte[] data, int offset, int length) {
+      vector.arrayData().appendBytes(length, data, offset);
+    }
+
     @Override public void close() {
       vector.close();
     }
 
+    public void reset() {
+      if (isConstant) {
+        return;
+      }
+      vector.reset();
+    }
+
+    public void setLazyPage(LazyPageLoader lazyPage) {
+      lazyPage.loadPage();
+    }
+
+    public ColumnVector getVector() {
+      return vector;
+    }
+  }
+
+  public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
+
+    private ColumnVector vector;
+
+    private LazyPageLoader pageLoad;
+
+    private boolean isLoaded;
+
+    public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
+      super(columnVector, capacity, mode);
+      vector = columnVector;
+    }
+
+    @Override public boolean isNullAt(int i) {
+      checkPageLoaded();
+      return vector.isNullAt(i);
+    }
+
+    @Override public boolean getBoolean(int i) {
+      checkPageLoaded();
+      return vector.getBoolean(i);
+    }
+
+    @Override public byte getByte(int i) {
+      checkPageLoaded();
+      return vector.getByte(i);
+    }
+
+    @Override public short getShort(int i) {
+      checkPageLoaded();
+      return vector.getShort(i);
+    }
+
+    @Override public int getInt(int i) {
+      checkPageLoaded();
+      return vector.getInt(i);
+    }
+
+    @Override public long getLong(int i) {
+      checkPageLoaded();
+      return vector.getLong(i);
+    }
+
+    @Override public float getFloat(int i) {
+      checkPageLoaded();
+      return vector.getFloat(i);
+    }
+
+    @Override public double getDouble(int i) {
+      checkPageLoaded();
+      return vector.getDouble(i);
+    }
+
+    @Override public int getArrayLength(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayLength(rowId);
+    }
+
+    @Override public int getArrayOffset(int rowId) {
+      checkPageLoaded();
+      return vector.getArrayOffset(rowId);
+    }
+
     private void checkPageLoaded() {
       if (!isLoaded) {
         if (pageLoad != null) {
@@ -475,8 +544,5 @@ public class CarbonVectorProxy {
       this.pageLoad = lazyPage;
     }
 
-    public ColumnVector getVector() {
-      return vector;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
index c8c4e2c..76a97ee 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -43,33 +43,29 @@ public class CarbonVectorProxy {
     private ColumnarBatch columnarBatch;
     private ColumnVectorProxy[] columnVectorProxies;
 
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        WritableColumnVector[] columnVectors =
-            ColumnVectorFactory.getColumnVector(memMode, new StructType(structFileds), rowNum);
-        columnVectorProxies = new ColumnVectorProxy[columnVectors.length];
-        for (int i = 0; i < columnVectorProxies.length; i++) {
-            columnVectorProxies[i] = new ColumnVectorProxy(columnVectors[i]);
-        }
-        columnarBatch = new ColumnarBatch(columnVectorProxies);
-        columnarBatch.setNumRows(rowNum);
-    }
 
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
+  /**
+   * Adapter class which handles the columnar vector reading of the carbondata
+   * based on the spark ColumnVector and ColumnarBatch API. This proxy class
+   * handles the complexity of spark 2.3 version related api changes since
+   * spark ColumnVector and ColumnarBatch interfaces are still evolving.
+   *
+   * @param memMode       which represent the type onheap or offheap vector.
+   * @param outputSchema, metadata related to current schema of table.
+   * @param rowNum        rows number for vector reading
+   * @param useLazyLoad   Whether to use lazy load while getting the data.
+   */
+    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
+        boolean useLazyLoad) {
         WritableColumnVector[] columnVectors = ColumnVectorFactory
                 .getColumnVector(memMode, outputSchema, rowNum);
         columnVectorProxies = new ColumnVectorProxy[columnVectors.length];
         for (int i = 0; i < columnVectorProxies.length; i++) {
+          if (useLazyLoad) {
+            columnVectorProxies[i] = new ColumnVectorProxyWithLazyLoad(columnVectors[i]);
+          } else {
             columnVectorProxies[i] = new ColumnVectorProxy(columnVectors[i]);
+          }
         }
         columnarBatch = new ColumnarBatch(columnVectorProxies);
         columnarBatch.setNumRows(rowNum);
@@ -148,10 +144,6 @@ public class CarbonVectorProxy {
 
         private WritableColumnVector vector;
 
-        private LazyPageLoader pageLoad;
-
-        private boolean isLoaded;
-
         public ColumnVectorProxy(ColumnVector columnVector) {
             super(columnVector.dataType());
             vector = (WritableColumnVector) columnVector;
@@ -321,42 +313,34 @@ public class CarbonVectorProxy {
         }
 
         @Override public boolean isNullAt(int i) {
-            checkPageLoaded();
             return vector.isNullAt(i);
         }
 
         @Override public boolean getBoolean(int i) {
-            checkPageLoaded();
             return vector.getBoolean(i);
         }
 
         @Override public byte getByte(int i) {
-            checkPageLoaded();
             return vector.getByte(i);
         }
 
         @Override public short getShort(int i) {
-            checkPageLoaded();
             return vector.getShort(i);
         }
 
         @Override public int getInt(int i) {
-            checkPageLoaded();
             return vector.getInt(i);
         }
 
         @Override public long getLong(int i) {
-            checkPageLoaded();
             return vector.getLong(i);
         }
 
         @Override public float getFloat(int i) {
-            checkPageLoaded();
             return vector.getFloat(i);
         }
 
         @Override public double getDouble(int i) {
-            checkPageLoaded();
             return vector.getDouble(i);
         }
 
@@ -365,66 +349,174 @@ public class CarbonVectorProxy {
         }
 
         @Override public boolean hasNull() {
-            checkPageLoaded();
             return vector.hasNull();
         }
 
         @Override public int numNulls() {
-            checkPageLoaded();
             return vector.numNulls();
         }
 
         @Override public ColumnarArray getArray(int i) {
-            checkPageLoaded();
             return vector.getArray(i);
         }
 
         @Override public ColumnarMap getMap(int i) {
-            checkPageLoaded();
             return vector.getMap(i);
         }
 
         @Override public Decimal getDecimal(int i, int i1, int i2) {
-            checkPageLoaded();
             return vector.getDecimal(i, i1, i2);
         }
 
         @Override public UTF8String getUTF8String(int i) {
-            checkPageLoaded();
             return vector.getUTF8String(i);
         }
 
         @Override public byte[] getBinary(int i) {
-            checkPageLoaded();
             return vector.getBinary(i);
         }
 
         @Override protected ColumnVector getChild(int i) {
-            checkPageLoaded();
             return vector.getChild(i);
         }
 
-        private void checkPageLoaded() {
-          if (!isLoaded) {
-              if (pageLoad != null) {
-                  pageLoad.loadPage();
-              }
-              isLoaded = true;
-          }
-        }
-
         public void reset() {
-            isLoaded = false;
-            pageLoad = null;
             vector.reset();
         }
 
         public void setLazyPage(LazyPageLoader lazyPage) {
-            this.pageLoad = lazyPage;
+            lazyPage.loadPage();
         }
 
-        public WritableColumnVector getVector() {
+      /**
+       * It keeps all binary data of all rows to it.
+       * Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
+       * and offset.
+       */
+      public void putAllByteArray(byte[] data, int offset, int length) {
+        vector.arrayData().appendBytes(length, data, offset);
+      }
+
+      public void putArray(int rowId, int offset, int length) {
+        vector.putArray(rowId, offset, length);
+      }
+
+      public WritableColumnVector getVector() {
             return vector;
         }
     }
+
+  public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
+
+    private WritableColumnVector vector;
+
+    private LazyPageLoader pageLoad;
+
+    private boolean isLoaded;
+
+    public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector) {
+      super(columnVector);
+      vector = (WritableColumnVector) columnVector;
+    }
+
+    @Override public boolean isNullAt(int i) {
+      checkPageLoaded();
+      return vector.isNullAt(i);
+    }
+
+    @Override public boolean getBoolean(int i) {
+      checkPageLoaded();
+      return vector.getBoolean(i);
+    }
+
+    @Override public byte getByte(int i) {
+      checkPageLoaded();
+      return vector.getByte(i);
+    }
+
+    @Override public short getShort(int i) {
+      checkPageLoaded();
+      return vector.getShort(i);
+    }
+
+    @Override public int getInt(int i) {
+      checkPageLoaded();
+      return vector.getInt(i);
+    }
+
+    @Override public long getLong(int i) {
+      checkPageLoaded();
+      return vector.getLong(i);
+    }
+
+    @Override public float getFloat(int i) {
+      checkPageLoaded();
+      return vector.getFloat(i);
+    }
+
+    @Override public double getDouble(int i) {
+      checkPageLoaded();
+      return vector.getDouble(i);
+    }
+
+    @Override public boolean hasNull() {
+      checkPageLoaded();
+      return vector.hasNull();
+    }
+
+    @Override public int numNulls() {
+      checkPageLoaded();
+      return vector.numNulls();
+    }
+
+    @Override public ColumnarArray getArray(int i) {
+      checkPageLoaded();
+      return vector.getArray(i);
+    }
+
+    @Override public ColumnarMap getMap(int i) {
+      checkPageLoaded();
+      return vector.getMap(i);
+    }
+
+    @Override public Decimal getDecimal(int i, int i1, int i2) {
+      checkPageLoaded();
+      return vector.getDecimal(i, i1, i2);
+    }
+
+    @Override public UTF8String getUTF8String(int i) {
+      checkPageLoaded();
+      return vector.getUTF8String(i);
+    }
+
+    @Override public byte[] getBinary(int i) {
+      checkPageLoaded();
+      return vector.getBinary(i);
+    }
+
+    @Override protected ColumnVector getChild(int i) {
+      checkPageLoaded();
+      return vector.getChild(i);
+    }
+
+    public void reset() {
+      isLoaded = false;
+      pageLoad = null;
+      vector.reset();
+    }
+
+    private void checkPageLoaded() {
+      if (!isLoaded) {
+        if (pageLoad != null) {
+          pageLoad.loadPage();
+        }
+        isLoaded = true;
+      }
+    }
+
+    public void setLazyPage(LazyPageLoader lazyPage) {
+      this.pageLoad = lazyPage;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index b66e446..50d6c46 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -438,14 +438,15 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
         // if filter is null and output projection is empty, use the row number of blocklet header
         if (skipScanData) {
             int rowNums = header.getBlocklet_info().getNum_rows();
-            vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,rowNums);
+            vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
             vectorProxy.setNumRows(rowNums);
             input.skipBlockletData(true);
             return rowNums > 0;
         }
 
         input.readBlockletData(header);
-        vectorProxy= new CarbonVectorProxy(MemoryMode.OFF_HEAP,outputSchema,input.getRowNums());
+        vectorProxy =
+          new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
         int rowNum = 0;
         if (null == filter) {
             while (input.hasNext()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bed51ba7/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c9434a1..95ab29d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableConfiguration
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{CarbonMetadata, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -145,79 +145,141 @@ case class CarbonDictionaryDecoder(
         ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
       }
       ctx.currentVars = input
+      val dictTuple = exprs.map(e => new DictTuple(null, false))
+      val decodeDictionary = ctx.freshName("deDict")
+      ctx.addNewFunction(decodeDictionary,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeDictionary(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           |    throws java.io.IOException {
+           |  boolean isNull = false;
+           |  byte[] valueIntern = dict.getDictionaryValueForKeyInBytes(surg);
+           |  if (valueIntern == null ||
+           |    java.util.Arrays.equals(org.apache.carbondata.core.constants
+           |  .CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, valueIntern)) {
+           |    isNull = true;
+           |    valueIntern = org.apache.carbondata.core.constants
+           |    .CarbonCommonConstants.ZERO_BYTE_ARRAY;
+           |  }
+           |  return new org.apache.spark.sql.DictTuple(valueIntern, isNull);
+           |}""".stripMargin)
+
+      val decodeDecimal = ctx.freshName("deDictDec")
+      ctx.addNewFunction(decodeDecimal,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeDecimal(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(org.apache.spark.sql.types.Decimal.apply(new java.math.BigDecimal(
+           |  new String((byte[])tuple.getValue(),
+           |  org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS))));
+           |  return tuple;
+           |}""".stripMargin)
+
+      val decodeInt = ctx.freshName("deDictInt")
+      ctx.addNewFunction(decodeInt,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeInt(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(Integer.parseInt(new String((byte[])tuple.getValue(),
+           |    org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
+           |  return tuple;
+           |}""".stripMargin)
+      val decodeShort = ctx.freshName("deDictShort")
+      ctx.addNewFunction(decodeShort,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeShort(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(Short.parseShort(new String((byte[])tuple.getValue(),
+           |    org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
+           |  return tuple;
+           |}""".stripMargin)
+      val decodeDouble = ctx.freshName("deDictDoub")
+      ctx.addNewFunction(decodeDouble,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeDouble(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(Double.parseDouble(new String((byte[])tuple.getValue(),
+           |    org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
+           |  return tuple;
+           |}""".stripMargin)
+      val decodeLong = ctx.freshName("deDictLong")
+      ctx.addNewFunction(decodeLong,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeLong(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(Long.parseLong(new String((byte[])tuple.getValue(),
+           |    org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
+           |  return tuple;
+           |}""".stripMargin)
+      val decodeStr = ctx.freshName("deDictStr")
+      ctx.addNewFunction(decodeStr,
+        s"""
+           |private org.apache.spark.sql.DictTuple $decodeStr(
+           |  org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
+           | throws java.io.IOException {
+           |  org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
+           |  tuple.setValue(UTF8String.fromBytes((byte[])tuple.getValue()));
+           |  return tuple;
+           |}""".stripMargin)
+
+
       val resultVars = exprs.zipWithIndex.map { case (expr, index) =>
         if (dicts(index) != null) {
           val ev = expr.genCode(ctx)
-          val value = ctx.freshName("value")
-          val valueIntern = ctx.freshName("valueIntern")
-          val isNull = ctx.freshName("isNull")
-          val dictsRef = ctx.addReferenceObj("dictsRef", dicts(index))
+          val dictRef = ctx.addReferenceObj("df", dicts(index))
+          val value = ctx.freshName("v")
           var code =
             s"""
                |${ev.code}
              """.stripMargin
-          code +=
+          if (CarbonDataTypes.isDecimal(getDictionaryColumnIds(index)._3.getDataType)) {
+            code +=
             s"""
-             |boolean $isNull = false;
-             |byte[] $valueIntern = $dictsRef.getDictionaryValueForKeyInBytes(${ ev.value });
-             |if ($valueIntern == null ||
-             |  java.util.Arrays.equals(org.apache.carbondata.core.constants
-             |.CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, $valueIntern)) {
-             |  $isNull = true;
-             |  $valueIntern = org.apache.carbondata.core.constants
-             |  .CarbonCommonConstants.ZERO_BYTE_ARRAY;
-             |}
-             """.stripMargin
-
-            val caseCode =
-              if (CarbonDataTypes.isDecimal(getDictionaryColumnIds(index)._3.getDataType)) {
-                s"""
-                   |org.apache.spark.sql.types.Decimal $value =
-                   |Decimal.apply(new java.math.BigDecimal(
-                   |new String($valueIntern, org.apache.carbondata.core.constants
-                   |.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
+               |org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
                  """.stripMargin
-              } else {
-                getDictionaryColumnIds(index)._3.getDataType match {
-                  case CarbonDataTypes.INT =>
-                    s"""
-                       |int $value = Integer.parseInt(new String($valueIntern,
-                       |org.apache.carbondata.core.constants.CarbonCommonConstants
-                       |.DEFAULT_CHARSET_CLASS));
+            ExprCode(code, s"$value.getIsNull()",
+              s"(org.apache.spark.sql.types.Decimal)$value.getValue()")
+          } else {
+            getDictionaryColumnIds(index)._3.getDataType match {
+              case CarbonDataTypes.INT => code +=
+                s"""
+                   |org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
                  """.stripMargin
-                  case CarbonDataTypes.SHORT =>
-                    s"""
-                       |short $value =
-                       |Short.parseShort(new String($valueIntern,
-                       |org.apache.carbondata.core.constants.CarbonCommonConstants
-                       |.DEFAULT_CHARSET_CLASS));
+                ExprCode(code, s"$value.getIsNull()", s"(Integer)$value.getValue()")
+              case CarbonDataTypes.SHORT => code +=
+                s"""
+                   |org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
                  """.stripMargin
-                  case CarbonDataTypes.DOUBLE =>
-                    s"""
-                       |double $value =
-                       |Double.parseDouble(new String($valueIntern,
-                       |org.apache.carbondata.core.constants.CarbonCommonConstants
-                       |.DEFAULT_CHARSET_CLASS));
+                ExprCode(code, s"$value.getIsNull()", s"(Short)$value.getValue()")
+              case CarbonDataTypes.DOUBLE => code +=
+                 s"""
+                    |org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
                  """.stripMargin
-                  case CarbonDataTypes.LONG =>
-                    s"""
-                       |long $value =
-                       |Long.parseLong(new String($valueIntern,
-                       |org.apache.carbondata.core.constants.CarbonCommonConstants
-                       |.DEFAULT_CHARSET_CLASS));
+                ExprCode(code, s"$value.getIsNull()", s"(Double)$value.getValue()")
+              case CarbonDataTypes.LONG => code +=
+                 s"""
+                    |org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
                  """.stripMargin
-                  case _ =>
-                    s"""
-                       | UTF8String $value = UTF8String.fromBytes($valueIntern);
+                ExprCode(code, s"$value.getIsNull()", s"(Long)$value.getValue()")
+              case _ => code +=
+                s"""
+                   |org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
                  """.stripMargin
-                }
-              }
-          code +=
-            s"""
-               |$caseCode
-             """.stripMargin
+                ExprCode(code, s"$value.getIsNull()", s"(UTF8String)$value.getValue()")
+
+            }
+          }
 
-          ExprCode(code, isNull, value)
         } else {
           expr.genCode(ctx)
         }
@@ -588,6 +650,21 @@ class ForwardDictionaryWrapper(
   }
 }
 
+class DictTuple(var value: AnyRef, var isNull: Boolean) extends Serializable {
+
+  def getValue: AnyRef = value
+
+  def getIsNull: Boolean = isNull
+
+  def setValue(value: AnyRef): Unit = {
+    this.value = value
+  }
+
+  def setIsNull(isNull: Boolean): Unit = {
+    this.isNull = isNull
+  }
+}
+
 /**
  * It is Dictionary Loader class to load all dictionaries at a time instead of one by one.
  */