You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/12/14 03:39:41 UTC

[1/2] carbondata git commit: [CARBONDATA-1779] GenericVectorizedReader

Repository: carbondata
Updated Branches:
  refs/heads/master 27b36e4e8 -> e5e74fc90


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 0c71adc..7328899 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.AbstractQueue;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
@@ -330,7 +332,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
           rowData.putDouble(size, (Double) value);
           size += 8;
         } else if (DataTypes.isDecimal(dataType)) {
-          byte[] bigDecimalInBytes = (byte[]) value;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value));
           rowData.putShort(size, (short) bigDecimalInBytes.length);
           size += 2;
           for (int i = 0; i < bigDecimalInBytes.length; i++) {


[2/2] carbondata git commit: [CARBONDATA-1779] GenericVectorizedReader

Posted by ch...@apache.org.
[CARBONDATA-1779] GenericVectorizedReader

This PR removes the Spark Dependency from Presto Integration Module for using the CarbonVectorizedRecordreader, This PR consolidate CarbonVectorizedRecordReader into one,to make it shared for all integration modules.In the earlier version of Presto Integration we were using ColumnarBatch of Spark, which is not a good practice, here we provided our own implementation of the ColumnVector and the VectorBatch to eliminate the Spark all together. This generic ColumnVector can now be used for all the integration module wherever we want to have a VectorizedReader to speed up the processing. There are some core module classes changed to ensure that we are using Java data types instead of Spark datatypes, Decimal being one of them.

This closes #1581


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e5e74fc9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e5e74fc9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e5e74fc9

Branch: refs/heads/master
Commit: e5e74fc90855be7738b16573d6803411146baddf
Parents: 27b36e4
Author: Bhavya <bh...@knoldus.com>
Authored: Tue Nov 14 15:35:44 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Thu Dec 14 11:39:17 2017 +0800

----------------------------------------------------------------------
 ...feVariableLengthDimensionDataChunkStore.java |  24 +-
 ...afeVariableLengthDimesionDataChunkStore.java |  23 +-
 .../RestructureBasedVectorResultCollector.java  |   2 +-
 .../scan/result/vector/CarbonColumnVector.java  |   9 +-
 .../vector/MeasureDataVectorProcessor.java      |   8 +-
 .../vector/impl/CarbonColumnVectorImpl.java     | 247 +++++++++++++++++++
 integration/presto/pom.xml                      |  24 +-
 .../presto/CarbonColumnVectorWrapper.java       | 209 ++++++++++++++++
 .../carbondata/presto/CarbonTypeUtil.java       |  34 ---
 .../carbondata/presto/CarbonVectorBatch.java    | 131 ++++++++++
 .../presto/CarbonVectorizedRecordReader.java    |  48 ++--
 .../carbondata/presto/CarbondataConnector.java  |   5 +-
 .../presto/CarbondataConnectorFactory.java      |   5 +-
 .../carbondata/presto/CarbondataMetadata.java   |  10 +-
 .../carbondata/presto/CarbondataPageSource.java |   7 +-
 .../presto/ColumnarVectorWrapper.java           | 209 ----------------
 .../carbondata/presto/PrestoFilterUtil.java     |  19 +-
 .../presto/impl/CarbonTableReader.java          |  66 +++--
 .../presto/readers/AbstractStreamReader.java    |   6 +-
 .../readers/DecimalSliceStreamReader.java       |   8 +-
 .../presto/readers/DoubleStreamReader.java      |   4 +-
 .../presto/readers/IntegerStreamReader.java     |   5 +-
 .../presto/readers/LongStreamReader.java        |   4 +-
 .../presto/readers/ObjectStreamReader.java      |   2 +-
 .../presto/readers/ShortStreamReader.java       |   6 +-
 .../presto/readers/SliceStreamReader.java       |   4 +-
 .../carbondata/presto/readers/StreamReader.java |   5 +-
 .../presto/readers/StreamReaders.java           |   3 -
 .../presto/readers/TimestampStreamReader.java   |   4 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  20 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   4 +-
 31 files changed, 755 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 7ce3a1d..f0d18dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -20,16 +20,11 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.util.ByteUtil;
 
-import org.apache.spark.sql.types.BooleanType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.IntegerType;
-import org.apache.spark.sql.types.LongType;
-import org.apache.spark.sql.types.ShortType;
-import org.apache.spark.sql.types.StringType;
-import org.apache.spark.sql.types.TimestampType;
 
 /**
  * Below class is responsible to store variable length dimension data chunk in
@@ -142,23 +137,24 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
       length = (short) (this.data.length - currentDataOffset);
     }
     DataType dt = vector.getType();
-    if ((!(dt instanceof StringType) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+
+    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
             CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentDataOffset,
             length)) {
       vector.putNull(vectorRow);
     } else {
-      if (dt instanceof StringType) {
+      if (dt == DataTypes.STRING) {
         vector.putBytes(vectorRow, currentDataOffset, length, data);
-      } else if (dt instanceof BooleanType) {
+      } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
-      } else if (dt instanceof ShortType) {
+      } else if (dt == DataTypes.SHORT) {
         vector.putShort(vectorRow, ByteUtil.toShort(data, currentDataOffset, length));
-      } else if (dt instanceof IntegerType) {
+      } else if (dt == DataTypes.INT) {
         vector.putInt(vectorRow, ByteUtil.toInt(data, currentDataOffset, length));
-      } else if (dt instanceof LongType) {
+      } else if (dt == DataTypes.LONG) {
         vector.putLong(vectorRow, ByteUtil.toLong(data, currentDataOffset, length));
-      } else if (dt instanceof TimestampType) {
+      } else if (dt  == DataTypes.TIMESTAMP) {
         vector.putLong(vectorRow, ByteUtil.toLong(data, currentDataOffset, length) * 1000L);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
index c242752..d6af052 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
@@ -21,16 +21,11 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.util.ByteUtil;
 
-import org.apache.spark.sql.types.BooleanType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.IntegerType;
-import org.apache.spark.sql.types.LongType;
-import org.apache.spark.sql.types.ShortType;
-import org.apache.spark.sql.types.StringType;
-import org.apache.spark.sql.types.TimestampType;
 
 /**
  * Below class is responsible to store variable length dimension data chunk in
@@ -169,21 +164,21 @@ public class UnsafeVariableLengthDimesionDataChunkStore
   @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
     byte[] value = getRow(rowId);
     DataType dt = vector.getType();
-    if ((!(dt instanceof StringType) && value.length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+    if ((!(dt == DataTypes.STRING) && value.length == 0) || ByteUtil.UnsafeComparer.INSTANCE
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
       vector.putNull(vectorRow);
     } else {
-      if (dt instanceof StringType) {
+      if (dt == DataTypes.STRING) {
         vector.putBytes(vectorRow, 0, value.length, value);
-      } else if (dt instanceof BooleanType) {
+      } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
-      } else if (dt instanceof ShortType) {
+      } else if (dt == DataTypes.SHORT) {
         vector.putShort(vectorRow, ByteUtil.toShort(value, 0, value.length));
-      } else if (dt instanceof IntegerType) {
+      } else if (dt == DataTypes.INT) {
         vector.putInt(vectorRow, ByteUtil.toInt(value, 0, value.length));
-      } else if (dt instanceof LongType) {
+      } else if (dt == DataTypes.LONG) {
         vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length));
-      } else if (dt instanceof TimestampType) {
+      } else if (dt == DataTypes.TIMESTAMP) {
         vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length) * 1000L);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index d9b7b23..1e29e98 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -238,7 +238,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
                 (long) defaultValue);
           } else if (DataTypes.isDecimal(dataType)) {
             vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size,
-                (Decimal) defaultValue, measure.getPrecision());
+                ((Decimal) defaultValue).toJavaBigDecimal(), measure.getPrecision());
           } else {
             vector.putDoubles(columnVectorInfo.vectorOffset, columnVectorInfo.size,
                 (double) defaultValue);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index a3eb48b..40a52e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -17,8 +17,9 @@
 
 package org.apache.carbondata.core.scan.result.vector;
 
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public interface CarbonColumnVector {
 
@@ -38,9 +39,9 @@ public interface CarbonColumnVector {
 
   void putLongs(int rowId, int count, long value);
 
-  void putDecimal(int rowId, Decimal value, int precision);
+  void putDecimal(int rowId, BigDecimal value, int precision);
 
-  void putDecimals(int rowId, int count, Decimal value, int precision);
+  void putDecimals(int rowId, int count, BigDecimal value, int precision);
 
   void putDouble(int rowId, double value);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index 5916fd3..db4c982 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -23,8 +23,6 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
-import org.apache.spark.sql.types.Decimal;
-
 public class MeasureDataVectorProcessor {
 
   public interface MeasureVectorFiller {
@@ -274,8 +272,7 @@ public class MeasureDataVectorProcessor {
           if (decimal.scale() < newMeasureScale) {
             decimal = decimal.setScale(newMeasureScale);
           }
-          Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
-          vector.putDecimal(vectorOffset, toDecimal, precision);
+          vector.putDecimal(vectorOffset, decimal, precision);
         }
         vectorOffset++;
       }
@@ -299,8 +296,7 @@ public class MeasureDataVectorProcessor {
           if (info.measure.getMeasure().getScale() > decimal.scale()) {
             decimal = decimal.setScale(info.measure.getMeasure().getScale());
           }
-          Decimal toDecimal = Decimal.apply(decimal);
-          vector.putDecimal(vectorOffset, toDecimal, precision);
+          vector.putDecimal(vectorOffset, decimal, precision);
         }
         vectorOffset++;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
new file mode 100644
index 0000000..5f8233c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector.impl;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+
+
+public class CarbonColumnVectorImpl implements CarbonColumnVector {
+
+  private Object[] data;
+
+  private int[] ints;
+
+  private long[] longs;
+
+  private BigDecimal[] decimals;
+
+  private byte[] byteArr;
+
+  private byte[][] bytes;
+
+  private float[] floats;
+
+  private double[] doubles;
+
+  private short[] shorts;
+
+  private BitSet nullBytes;
+
+  private DataType dataType;
+
+  /**
+   * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
+   * having to clear NULL bits.
+   */
+  protected boolean anyNullsSet;
+
+
+  public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
+    nullBytes = new BitSet(batchSize);
+    this.dataType = dataType;
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      byteArr = new byte[batchSize];
+    } else if (dataType == DataTypes.SHORT) {
+      shorts = new short[batchSize];
+    } else if (dataType == DataTypes.INT) {
+      ints = new int[batchSize];
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+      longs = new long[batchSize];
+    } else if (dataType == DataTypes.FLOAT) {
+      floats = new float[batchSize];
+    } else if (dataType == DataTypes.DOUBLE) {
+      doubles = new double[batchSize];
+    } else if (dataType instanceof DecimalType) {
+      decimals = new BigDecimal[batchSize];
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      bytes = new byte[batchSize][];
+    } else {
+      data = new Object[batchSize];
+    }
+
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    byteArr[rowId] =  (byte)((value) ? 1 : 0);
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    floats[rowId] = value;
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    shorts[rowId] = value;
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    for (int i = 0; i < count; ++i) {
+      shorts[i + rowId] = value;
+    }
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    ints[rowId] = value;
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    for (int i = 0; i < count; ++i) {
+      ints[i + rowId] = value;
+    }
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    longs[rowId] = value;
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    for (int i = 0; i < count; ++i) {
+      longs[i + rowId] = value;
+    }
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal  value, int precision) {
+    decimals[rowId] = value;
+  }
+
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    for (int i = 0; i < count; ++i) {
+      decimals[i + rowId] = value;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    doubles[rowId] = value;
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    for (int i = 0; i < count; ++i) {
+      doubles[i + rowId] = value;
+    }
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    bytes[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; ++i) {
+      bytes[i + rowId] = value;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    bytes[rowId] = new byte[length];
+    System.arraycopy(value, offset, bytes[rowId], 0, length);
+  }
+
+  @Override public void putNull(int rowId) {
+    nullBytes.set(rowId);
+    anyNullsSet = true;
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; ++i) {
+      nullBytes.set(rowId + i);
+    }
+    anyNullsSet = true;
+  }
+
+
+  public boolean isNullAt(int rowId) {
+    return nullBytes.get(rowId);
+  }
+
+
+  @Override public boolean isNull(int rowId) {
+    return nullBytes.get(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    data[rowId] = obj;
+  }
+
+  @Override public Object getData(int rowId) {
+    if (nullBytes.get(rowId)) {
+      return null;
+    }
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      return  byteArr[rowId];
+    } else if (dataType == DataTypes.SHORT) {
+      return shorts[rowId];
+    } else if (dataType == DataTypes.INT) {
+      return ints[rowId];
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+      return longs[rowId];
+    } else if (dataType == DataTypes.FLOAT) {
+      return floats[rowId];
+    } else if (dataType == DataTypes.DOUBLE) {
+      return doubles[rowId];
+    } else if (dataType instanceof DecimalType) {
+      return decimals[rowId];
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      return  bytes[rowId];
+    } else {
+      return data[rowId];
+    }
+  }
+
+  @Override public void reset() {
+    nullBytes.clear();
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      Arrays.fill(byteArr, (byte) 0);
+    } else if (dataType == DataTypes.SHORT) {
+      Arrays.fill(shorts, (short) 0);
+    } else if (dataType == DataTypes.INT) {
+      Arrays.fill(ints, 0);
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+      Arrays.fill(longs, 0);
+    } else if (dataType == DataTypes.FLOAT) {
+      Arrays.fill(floats, 0);
+    } else if (dataType == DataTypes.DOUBLE) {
+      Arrays.fill(doubles, 0);
+    } else if (dataType instanceof DecimalType) {
+      Arrays.fill(decimals, null);
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      Arrays.fill(bytes, null);
+    } else {
+      Arrays.fill(data, null);
+    }
+
+  }
+
+  @Override public DataType getType() {
+    return dataType;
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+
+  }
+
+  /**
+   * Returns true if any of the nulls indicator are set for this column. This can be used
+   * as an optimization to prevent setting nulls.
+   */
+  public final boolean anyNullsSet() { return anyNullsSet; }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 5179284..98bbe99 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -31,7 +31,7 @@
   <packaging>presto-plugin</packaging>
 
   <properties>
-    <presto.version>0.186</presto.version>
+    <presto.version>0.187</presto.version>
     <dev.path>${basedir}/../../dev</dev.path>
   </properties>
 
@@ -223,6 +223,10 @@
           <groupId>oro</groupId>
           <artifactId>oro</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.11</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -431,14 +435,27 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_2.11</artifactId>
+      <artifactId>spark-network-common_2.11</artifactId>
+      <scope>test</scope>
       <version>2.1.0</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-network-common_2.11</artifactId>
+      <artifactId>spark-core_2.11</artifactId>
       <scope>test</scope>
       <version>2.1.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr4-runtime</artifactId>
+      <scope>test</scope>
+      <version>4.5.3</version>
     </dependency>
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
@@ -573,7 +590,6 @@
         </configuration>
       </plugin>
       <plugin>
-
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest-maven-plugin</artifactId>
         <version>1.0</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
new file mode 100644
index 0000000..e19a598
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+public class CarbonColumnVectorWrapper implements CarbonColumnVector {
+
+  private CarbonColumnVectorImpl columnVector;
+
+  private boolean[] filteredRows;
+
+  private int counter;
+
+  private boolean filteredRowsExist;
+
+  public CarbonColumnVectorWrapper(CarbonColumnVectorImpl columnVector, boolean[] filteredRows) {
+    this.columnVector = columnVector;
+    this.filteredRows = filteredRows;
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBoolean(counter++, value);
+    }
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putFloat(counter++, value);
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putShort(counter++, value);
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putShort(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putShorts(rowId, count, value);
+    }
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putInt(counter++, value);
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putInt(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putInts(rowId, count, value);
+    }
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putLong(counter++, value);
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putLong(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putLongs(rowId, count, value);
+    }
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+    if (!filteredRows[rowId]) {
+      columnVector.putDecimal(counter++, value, precision);
+    }
+  }
+
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putDecimal(counter++, value, precision);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putDouble(counter++, value);
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putDouble(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putDoubles(rowId, count, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBytes(counter++, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putBytes(counter++, value);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBytes(counter++, offset, length, value);
+    }
+  }
+
+  @Override public void putNull(int rowId) {
+    if (!filteredRows[rowId]) {
+      columnVector.putNull(counter++);
+    }
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putNull(counter++);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putNulls(rowId, count);
+    }
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return columnVector.isNullAt(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    //TODO handle complex types
+  }
+
+  @Override public Object getData(int rowId) {
+    //TODO handle complex types
+    return null;
+  }
+
+  @Override public void reset() {
+    counter = 0;
+    filteredRowsExist = false;
+  }
+
+  @Override public DataType getType() {
+    return columnVector.getType();
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+    this.filteredRowsExist = filteredRowsExist;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
deleted file mode 100644
index ddc8d9e..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.carbondata.presto;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-import org.apache.spark.sql.types.DataTypes;
-
-public class CarbonTypeUtil {
-
-  static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
-      DataType carbonDataType) {
-    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
-      return DataTypes.StringType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
-      return DataTypes.ShortType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
-      return DataTypes.IntegerType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
-        return DataTypes.LongType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
-        return DataTypes.DoubleType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
-        return DataTypes.BooleanType;
-    } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
-        return DataTypes.createDecimalType();
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
-        return DataTypes.TimestampType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
-      return DataTypes.DateType;
-    } else {
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
new file mode 100644
index 0000000..b230d6a
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+public class CarbonVectorBatch {
+
+  private static final int DEFAULT_BATCH_SIZE = 1024;
+
+  private final StructField[] schema;
+  private final int capacity;
+  private int numRows;
+  private final CarbonColumnVectorImpl[] columns;
+
+  // True if the row is filtered.
+  private final boolean[] filteredRows;
+
+  // Column indices that cannot have null values.
+  private final Set<Integer> nullFilteredColumns;
+
+  // Total number of rows that have been filtered.
+  private int numRowsFiltered = 0;
+
+
+  private CarbonVectorBatch(StructField[] schema, int maxRows) {
+    this.schema = schema;
+    this.capacity = maxRows;
+    this.columns = new CarbonColumnVectorImpl[schema.length];
+    this.nullFilteredColumns = new HashSet<>();
+    this.filteredRows = new boolean[maxRows];
+
+    for (int i = 0; i < schema.length; ++i) {
+      StructField field = schema[i];
+      columns[i] = new CarbonColumnVectorImpl(maxRows, field.getDataType());
+    }
+
+  }
+
+
+  public static CarbonVectorBatch allocate(StructField[] schema) {
+    return new CarbonVectorBatch(schema, DEFAULT_BATCH_SIZE);
+  }
+
+  public static CarbonVectorBatch allocate(StructField[] schema,  int maxRows) {
+    return new CarbonVectorBatch(schema, maxRows);
+  }
+  /**
+   * Resets the batch for writing.
+   */
+  public void reset() {
+    for (int i = 0; i < numCols(); ++i) {
+      columns[i].reset();
+    }
+    if (this.numRowsFiltered > 0) {
+      Arrays.fill(filteredRows, false);
+    }
+    this.numRows = 0;
+    this.numRowsFiltered = 0;
+  }
+
+
+  /**
+   * Returns the number of columns that make up this batch.
+   */
+  public int numCols() { return columns.length; }
+
+  /**
+   * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
+   * more of their attributes are part of a non-nullable column.
+   */
+  public void setNumRows(int numRows) {
+    assert(numRows <= this.capacity);
+    this.numRows = numRows;
+
+    for (int ordinal : nullFilteredColumns) {
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        if (!filteredRows[rowId] && columns[ordinal].isNull(rowId)) {
+          filteredRows[rowId] = true;
+          ++numRowsFiltered;
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Returns the number of rows for read, including filtered rows.
+   */
+  public int numRows() { return numRows; }
+
+  /**
+   * Returns the number of valid rows.
+   */
+  public int numValidRows() {
+    assert(numRowsFiltered <= numRows);
+    return numRows - numRowsFiltered;
+  }
+
+  /**
+   * Returns the column at `ordinal`.
+   */
+  public CarbonColumnVectorImpl column(int ordinal) { return columns[ordinal]; }
+
+  /**
+   * Returns the max capacity (in number of rows) for this batch.
+   */
+  public int capacity() { return capacity; }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
index 5e3e5b7..910ccf4 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
@@ -42,15 +43,9 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -62,7 +57,7 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
 
   private int numBatched = 0;
 
-  private ColumnarBatch columnarBatch;
+  private CarbonVectorBatch columnarBatch;
 
   private CarbonColumnarBatch carbonColumnarBatch;
 
@@ -71,11 +66,6 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
    */
   private boolean returnColumnarBatch;
 
-  /**
-   * The default config on whether columnarBatch should be offheap.
-   */
-  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.OFF_HEAP;
-
   private QueryModel queryModel;
 
   private AbstractDetailQueryResultIterator iterator;
@@ -121,7 +111,6 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
   @Override public void close() throws IOException {
     logStatistics(rowCount, queryModel.getStatisticsRecorder());
     if (columnarBatch != null) {
-      columnarBatch.close();
       columnarBatch = null;
     }
     // clear dictionary cache
@@ -154,9 +143,9 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     if (returnColumnarBatch) {
       rowCount += columnarBatch.numValidRows();
       return columnarBatch;
+    } else {
+      return null;
     }
-    rowCount += 1;
-    return columnarBatch.getRow(batchIdx - 1);
   }
 
   @Override public Void getCurrentKey() throws IOException, InterruptedException {
@@ -174,7 +163,7 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
    * before any calls to nextKeyValue/nextBatch.
    */
 
-  private void initBatch(MemoryMode memMode) {
+  private void initBatch() {
     List<QueryDimension> queryDimension = queryModel.getQueryDimension();
     List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
     StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
@@ -184,18 +173,16 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
         DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(dim.getDimension().getDataType());
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+           generator.getReturnType());
       } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
-            null);
+            dim.getDimension().getDataType());
       } else if (dim.getDimension().isComplex()) {
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
-            null);
+           dim.getDimension().getDataType());
       } else {
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
+            DataTypes.INT);
       }
     }
 
@@ -204,32 +191,27 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
       DataType dataType = msr.getMeasure().getDataType();
       if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
-            null);
+            msr.getMeasure().getDataType());
       } else if (DataTypes.isDecimal(dataType)) {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-            new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
-            null);
+           msr.getMeasure().getDataType());
       } else {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
+            DataTypes.DOUBLE);
       }
     }
 
-    columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
+    columnarBatch = CarbonVectorBatch.allocate(fields);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
     boolean[] filteredRows = new boolean[columnarBatch.capacity()];
     for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
+      vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
     }
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
   }
 
-  private void initBatch() {
-    initBatch(DEFAULT_MEMORY_MODE);
-  }
 
-  private ColumnarBatch resultBatch() {
+  private CarbonVectorBatch resultBatch() {
     if (columnarBatch == null) initBatch();
     return columnarBatch;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
index 25917ac..92cd655 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -31,13 +31,13 @@ public class CarbondataConnector implements Connector {
   private static final Logger log = Logger.get(CarbondataConnector.class);
 
   private final LifeCycleManager lifeCycleManager;
-  private final CarbondataMetadata metadata;
+  private final ConnectorMetadata metadata;
   private final ConnectorSplitManager splitManager;
   private final ConnectorRecordSetProvider recordSetProvider;
   private final ClassLoader classLoader;
   private final ConnectorPageSourceProvider pageSourceProvider;
 
-  public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+  public CarbondataConnector(LifeCycleManager lifeCycleManager, ConnectorMetadata metadata,
       ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
       ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
     this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
@@ -55,7 +55,6 @@ public class CarbondataConnector implements Connector {
   }
 
   @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
-    metadata.putClassLoader(classLoader);
     return metadata;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d557920..579af50 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
 import com.google.common.base.Throwables;
@@ -67,13 +68,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
               .initialize();
 
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
-      CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      ConnectorMetadata metadata = injector.getInstance(CarbondataMetadata.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
-      return new CarbondataConnector(lifeCycleManager, metadata,
+      return new CarbondataConnector(lifeCycleManager, new ClassLoaderSafeConnectorMetadata(metadata,classLoader),
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
           classLoader,
           new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f106a08..cee7c35 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -21,7 +21,6 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
 import com.facebook.presto.spi.type.*;
 import com.google.common.collect.ImmutableList;
@@ -42,7 +41,6 @@ import static java.util.Objects.requireNonNull;
 public class CarbondataMetadata implements ConnectorMetadata {
   private final String connectorId;
   private CarbonTableReader carbonTableReader;
-  private ClassLoader classLoader;
 
   private Map<String, ColumnHandle> columnHandleMap;
 
@@ -51,19 +49,13 @@ public class CarbondataMetadata implements ConnectorMetadata {
     this.carbonTableReader = requireNonNull(reader, "client is null");
   }
 
-  public void putClassLoader(ClassLoader classLoader) {
-    this.classLoader = classLoader;
-  }
 
   @Override public List<String> listSchemaNames(ConnectorSession session) {
     return listSchemaNamesInternal();
   }
 
   public List<String> listSchemaNamesInternal() {
-    List<String> schemaNameList;
-    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      schemaNameList = carbonTableReader.getSchemaNames();
-    }
+    List<String> schemaNameList = carbonTableReader.getSchemaNames();;
     return schemaNameList;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 390565b..e8ecba3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -38,7 +38,6 @@ import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.LazyBlock;
 import com.facebook.presto.spi.block.LazyBlockLoader;
 import com.facebook.presto.spi.type.Type;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Collections.unmodifiableList;
@@ -95,15 +94,15 @@ class CarbondataPageSource implements ConnectorPageSource {
     if (nanoStart == 0) {
       nanoStart = System.nanoTime();
     }
-    ColumnarBatch columnarBatch = null;
+    CarbonVectorBatch columnarBatch = null;
     int batchSize = 0;
     try {
       batchId++;
       if(vectorReader.nextKeyValue()) {
         Object vectorBatch = vectorReader.getCurrentValue();
-        if(vectorBatch != null && vectorBatch instanceof ColumnarBatch)
+        if(vectorBatch != null && vectorBatch instanceof CarbonVectorBatch)
         {
-          columnarBatch = (ColumnarBatch) vectorBatch;
+          columnarBatch = (CarbonVectorBatch) vectorBatch;
           batchSize = columnarBatch.numRows();
           if(batchSize == 0){
             close();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
deleted file mode 100644
index bcb48ba..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-
-public class ColumnarVectorWrapper implements CarbonColumnVector {
-
-  private ColumnVector columnVector;
-
-  private boolean[] filteredRows;
-
-  private int counter;
-
-  private boolean filteredRowsExist;
-
-  public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
-    this.columnVector = columnVector;
-    this.filteredRows = filteredRows;
-  }
-
-  @Override public void putBoolean(int rowId, boolean value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putBoolean(counter++, value);
-    }
-  }
-
-  @Override public void putFloat(int rowId, float value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putFloat(counter++, value);
-    }
-  }
-
-  @Override public void putShort(int rowId, short value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putShort(counter++, value);
-    }
-  }
-
-  @Override public void putShorts(int rowId, int count, short value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putShort(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putShorts(rowId, count, value);
-    }
-  }
-
-  @Override public void putInt(int rowId, int value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putInt(counter++, value);
-    }
-  }
-
-  @Override public void putInts(int rowId, int count, int value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putInt(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putInts(rowId, count, value);
-    }
-  }
-
-  @Override public void putLong(int rowId, long value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putLong(counter++, value);
-    }
-  }
-
-  @Override public void putLongs(int rowId, int count, long value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putLong(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putLongs(rowId, count, value);
-    }
-  }
-
-  @Override public void putDecimal(int rowId, Decimal value, int precision) {
-    if (!filteredRows[rowId]) {
-      columnVector.putDecimal(counter++, value, precision);
-    }
-  }
-
-  @Override public void putDecimals(int rowId, int count, Decimal value, int precision) {
-    for (int i = 0; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putDecimal(counter++, value, precision);
-      }
-      rowId++;
-    }
-  }
-
-  @Override public void putDouble(int rowId, double value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putDouble(counter++, value);
-    }
-  }
-
-  @Override public void putDoubles(int rowId, int count, double value) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putDouble(counter++, value);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putDoubles(rowId, count, value);
-    }
-  }
-
-  @Override public void putBytes(int rowId, byte[] value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, value);
-    }
-  }
-
-  @Override public void putBytes(int rowId, int count, byte[] value) {
-    for (int i = 0; i < count; i++) {
-      if (!filteredRows[rowId]) {
-        columnVector.putByteArray(counter++, value);
-      }
-      rowId++;
-    }
-  }
-
-  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
-    if (!filteredRows[rowId]) {
-      columnVector.putByteArray(counter++, value, offset, length);
-    }
-  }
-
-  @Override public void putNull(int rowId) {
-    if (!filteredRows[rowId]) {
-      columnVector.putNull(counter++);
-    }
-  }
-
-  @Override public void putNulls(int rowId, int count) {
-    if (filteredRowsExist) {
-      for (int i = 0; i < count; i++) {
-        if (!filteredRows[rowId]) {
-          columnVector.putNull(counter++);
-        }
-        rowId++;
-      }
-    } else {
-      columnVector.putNulls(rowId, count);
-    }
-  }
-
-  @Override public boolean isNull(int rowId) {
-    return columnVector.isNullAt(rowId);
-  }
-
-  @Override public void putObject(int rowId, Object obj) {
-    //TODO handle complex types
-  }
-
-  @Override public Object getData(int rowId) {
-    //TODO handle complex types
-    return null;
-  }
-
-  @Override public void reset() {
-    counter = 0;
-    filteredRowsExist = false;
-  }
-
-  @Override public DataType getType() {
-    return columnVector.dataType();
-  }
-
-  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
-    this.filteredRowsExist = filteredRowsExist;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index ded4889..31d5ba6 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import com.facebook.presto.spi.type.*;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -47,6 +46,17 @@ import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.predicate.Domain;
 import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DateType;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.DoubleType;
+import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.VarcharType;
 import com.google.common.collect.ImmutableList;
 import io.airlift.slice.Slice;
 
@@ -70,10 +80,9 @@ public class PrestoFilterUtil {
     else if (colType == DateType.DATE) return DataTypes.DATE;
     else if (colType == TimestampType.TIMESTAMP) return DataTypes.TIMESTAMP;
     else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
-        carbondataColumnHandle.getScale())))
-      return org.apache.carbondata.core.metadata.datatype.DataTypes.createDecimalType(
-          carbondataColumnHandle.getPrecision(),
-          carbondataColumnHandle.getScale());
+        carbondataColumnHandle.getScale()))) return DataTypes
+        .createDecimalType(carbondataColumnHandle.getPrecision(),
+            carbondataColumnHandle.getScale());
     else return DataTypes.STRING;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7fe55fb..a79b17f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,11 +20,13 @@ package org.apache.carbondata.presto.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -46,12 +48,14 @@ import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
+import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
 import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
+import org.apache.commons.lang.time.DateUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -84,7 +88,7 @@ public class CarbonTableReader {
   /**
    * The names of the tables under the schema (this.carbonFileList).
    */
-  private List<SchemaTableName> tableList;
+  private ConcurrentSet<SchemaTableName> tableList;
   /**
    * carbonFileList represents the store path of the schema, which is configured as carbondata-store
    * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties).
@@ -95,12 +99,12 @@ public class CarbonTableReader {
    * A cache for Carbon reader, with this cache,
    * metadata of a table is only read from file system once.
    */
-  private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+  private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache;
 
   @Inject public CarbonTableReader(CarbonTableConfig config) {
     this.config = requireNonNull(config, "CarbonTableConfig is null");
-    this.cc = new ConcurrentHashMap<>();
-    tableList = new LinkedList<>();
+    this.carbonCache = new AtomicReference(new HashMap());
+    tableList = new ConcurrentSet<>();
   }
 
   /**
@@ -111,10 +115,10 @@ public class CarbonTableReader {
    */
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
 
-    if (!cc.containsKey(table) || cc.get(table) == null) {
-// if this table is not cached, try to read the metadata of the table and cache it.
+    if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) {
+      // if this table is not cached, try to read the metadata of the table and cache it.
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
-              FileFactory.class.getClassLoader())) {
+          FileFactory.class.getClassLoader())) {
         if (carbonFileList == null) {
           fileType = FileFactory.getFileType(config.getStorePath());
           try {
@@ -127,17 +131,19 @@ public class CarbonTableReader {
       updateSchemaTables(table);
       parseCarbonMetadata(table);
     }
-    if (cc.containsKey(table)) {
-      return cc.get(table);
+    if (carbonCache.get().containsKey(table)) {
+      return carbonCache.get().get(table);
     } else {
       return null;
     }
   }
 
   private void removeTableFromCache(SchemaTableName table) {
-    DataMapStoreManager.getInstance().clearDataMaps(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
-    cc.remove(table);
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(carbonCache.get().get(table).carbonTable.getAbsoluteTableIdentifier());
+    carbonCache.get().remove(table);
     tableList.remove(table);
+
   }
 
   /**
@@ -232,25 +238,36 @@ public class CarbonTableReader {
    * is called, it clears this.tableList and populate the list by reading the files.
    */
   private void updateSchemaTables(SchemaTableName schemaTableName) {
-// update logic determine later
-    boolean isKeyExists = cc.containsKey(schemaTableName);
+    // update logic determine later
+    boolean isKeyExists = carbonCache.get().containsKey(schemaTableName);
 
     if (carbonFileList == null) {
       updateSchemaList();
     }
     try {
-      if(isKeyExists && !FileFactory.isFileExist(cc.get(schemaTableName).carbonTablePath.getSchemaFilePath(),fileType)){
+      if (isKeyExists && !FileFactory
+          .isFileExist(carbonCache.get().get(schemaTableName).carbonTablePath.getSchemaFilePath(),
+              fileType)) {
         removeTableFromCache(schemaTableName);
         throw new TableNotFoundException(schemaTableName);
       }
     } catch (IOException e) {
-      e.printStackTrace();
       throw new RuntimeException();
     }
-    if(isKeyExists && FileFactory.getCarbonFile(cc.get(schemaTableName).carbonTablePath.getPath()).getLastModifiedTime() > cc.get(schemaTableName).tableInfo.getLastUpdatedTime()){
-      removeTableFromCache(schemaTableName);
+
+    if (isKeyExists) {
+      CarbonTableCacheModel ctcm = carbonCache.get().get(schemaTableName);
+      if(ctcm != null && ctcm.tableInfo != null) {
+        Long latestTime = FileFactory.getCarbonFile(ctcm.carbonTablePath.getSchemaFilePath())
+            .getLastModifiedTime();
+        Long oldTime = ctcm.tableInfo.getLastUpdatedTime();
+        if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
+            .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
+          removeTableFromCache(schemaTableName);
+        }
+      }
     }
-    if(!tableList.contains(schemaTableName)) {
+    if (!tableList.contains(schemaTableName)) {
       for (CarbonFile cf : carbonFileList.listFiles()) {
         if (!cf.getName().endsWith(".mdt")) {
           for (CarbonFile table : cf.listFiles()) {
@@ -279,7 +296,7 @@ public class CarbonTableReader {
   }
 
   /**
-   * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache).
+   * Read the metadata of the given table and cache it in this.carbonCache (CarbonTableReader cache).
    *
    * @param table name of the given table.
    * @return the CarbonTable instance which contains all the needed metadata for a table.
@@ -287,7 +304,7 @@ public class CarbonTableReader {
   private CarbonTable parseCarbonMetadata(SchemaTableName table) {
     CarbonTable result = null;
     try {
-      CarbonTableCacheModel cache = cc.get(table);
+      CarbonTableCacheModel cache = carbonCache.get().get(table);
       if (cache == null) {
         cache = new CarbonTableCacheModel();
       }
@@ -311,7 +328,7 @@ public class CarbonTableReader {
       cache.carbonTablePath =
           PathFactory.getInstance().getCarbonTablePath(absoluteTableIdentifier, null);
       // cache the table
-      cc.put(table, cache);
+      carbonCache.get().put(table, cache);
 
       //Step 2: read the metadata (tableInfo) of the table.
       ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
@@ -394,7 +411,8 @@ public class CarbonTableReader {
     return result;
   }
 
-  private CarbonTableInputFormat<Object>  createInputFormat( Configuration conf, AbsoluteTableIdentifier identifier, Expression filterExpression)
+  private CarbonTableInputFormat<Object>  createInputFormat( Configuration conf,
+       AbsoluteTableIdentifier identifier, Expression filterExpression)
           throws IOException {
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
     CarbonTableInputFormat.setTablePath(conf,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
index fa09e73..81a4b4f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.presto.readers;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 
 /**
  * Abstract class for Stream Readers
@@ -26,7 +26,7 @@ public abstract class AbstractStreamReader implements StreamReader {
 
   protected Object[] streamData;
 
-  protected ColumnVector columnVector;
+  protected CarbonColumnVectorImpl columnVector;
 
   protected boolean isVectorReader;
 
@@ -44,7 +44,7 @@ public abstract class AbstractStreamReader implements StreamReader {
    * Setter for Vector data
    * @param vector
    */
-  @Override public void setVector(ColumnVector vector) {
+  @Override public void setVector(CarbonColumnVectorImpl vector) {
     this.columnVector = vector;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 2f84bf4..ad798d4 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -187,13 +187,13 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
         builder.appendNull();
       } else {
         if (isShortDecimal(type)) {
-          BigDecimal decimalValue = columnVector.getDecimal(i, precision, scale).toJavaBigDecimal();
+          BigDecimal decimalValue = (BigDecimal)columnVector.getData(i);
           long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
               decimalValue.scale(), scale);
           type.writeLong(builder, rescaledDecimal);
         } else {
           Slice slice =
-              getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
+              getSlice(columnVector.getData(i), type);
           type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
         }
       }
@@ -203,7 +203,7 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
   private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
       int scale, int precision) {
     for (int i = 0; i < numberOfRows; i++) {
-      BigDecimal decimalValue = columnVector.getDecimal(i, precision, scale).toJavaBigDecimal();
+      BigDecimal decimalValue = (BigDecimal)columnVector.getData(i);
       long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
           decimalValue.scale(), scale);
       type.writeLong(builder, rescaledDecimal);
@@ -213,7 +213,7 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
   private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
       int scale, int precision) {
     for (int i = 0; i < numberOfRows; i++) {
-      Slice slice = getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
+      Slice slice = getSlice((BigDecimal)columnVector.getData(i), type);
       type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index 2b90a8d..a1910b7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -72,14 +72,14 @@ public class DoubleStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeDouble(builder, columnVector.getDouble(i));
+        type.writeDouble(builder, (Double)columnVector.getData(i));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-      type.writeDouble(builder, columnVector.getDouble(i));
+      type.writeDouble(builder, (Double)columnVector.getData(i));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index ccc0192..33fc529 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -23,7 +23,6 @@ import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
 import com.facebook.presto.spi.type.Type;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
 
 public class IntegerStreamReader extends AbstractStreamReader {
 
@@ -67,14 +66,14 @@ public class IntegerStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, ((Integer) columnVector.getInt(i)).longValue());
+        type.writeLong(builder, ((Integer) columnVector.getData(i)).longValue());
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder,  columnVector.getInt(i));
+        type.writeLong(builder,  (Integer) columnVector.getData(i));
       }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 5081b32..d7ccda0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -63,14 +63,14 @@ public class LongStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, columnVector.getLong(i));
+        type.writeLong(builder, (Long)columnVector.getData(i));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-      type.writeLong(builder, columnVector.getLong(i));
+      type.writeLong(builder, (Long)columnVector.getData(i));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index c659e1d..d642e41 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -52,7 +52,7 @@ public class ObjectStreamReader  extends AbstractStreamReader {
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
         for(int i = 0; i < numberOfRows ; i++ ){
-          type.writeObject(builder, columnVector.getByte(i));
+          type.writeObject(builder, columnVector.getData(i));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 59d8e96..87ebb12 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -53,7 +53,7 @@ public class ShortStreamReader extends AbstractStreamReader {
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (streamData != null) {
         for(int i = 0; i < numberOfRows ; i++ ){
-          type.writeLong(builder,(Short)streamData[i]);
+          type.writeLong(builder, (short) streamData[i]);
         }
       }
     }
@@ -66,14 +66,14 @@ public class ShortStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, (columnVector.getShort(i)));
+        type.writeLong(builder, ((short) columnVector.getData(i)));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-       type.writeLong(builder, (columnVector.getShort(i)));
+       type.writeLong(builder, ((short) columnVector.getData(i)));
       }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index bb6146a..36be07e 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -73,7 +73,7 @@ public class SliceStreamReader extends AbstractStreamReader {
           int[] values = new int[numberOfRows];
           for (int i = 0; i < numberOfRows; i++) {
             if (!columnVector.isNullAt(i)) {
-              values[i] = columnVector.getInt(i);
+              values[i] = (Integer) columnVector.getData(i);
             }
           }
           Block block = new DictionaryBlock(batchSize, dictionaryBlock, values);
@@ -84,7 +84,7 @@ public class SliceStreamReader extends AbstractStreamReader {
             if (columnVector.isNullAt(i)) {
               builder.appendNull();
             } else {
-              type.writeSlice(builder, wrappedBuffer(columnVector.getArray(i).toByteArray()));
+              type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
index a54df0d..c3cd6c0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.type.Type;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
 
 /**
  * Interface for StreamReader
@@ -32,7 +33,7 @@ public interface StreamReader {
 
   void setStreamData(Object[] data);
 
-  void setVector(ColumnVector vector);
+  void setVector(CarbonColumnVectorImpl vector);
 
   void setVectorReader(boolean isVectorReader);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
index 86f863a..df1f8d6 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
@@ -16,9 +16,6 @@
  */
 package org.apache.carbondata.presto.readers;
 
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.presto.CarbonDictionaryDecodeReadSupport;
-
 import com.facebook.presto.spi.block.SliceArrayBlock;
 import com.facebook.presto.spi.type.DateType;
 import com.facebook.presto.spi.type.DecimalType;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 8ea3efb..01b7939 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -65,14 +65,14 @@ public class TimestampStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, columnVector.getLong(i)/ TIMESTAMP_DIVISOR);
+        type.writeLong(builder, (Long)columnVector.getData(i)/ TIMESTAMP_DIVISOR);
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-      type.writeLong(builder, columnVector.getLong(i)/TIMESTAMP_DIVISOR);
+      type.writeLong(builder, (Long)columnVector.getData(i)/TIMESTAMP_DIVISOR);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5e74fc9/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 5ab741b..9387276 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -17,10 +17,13 @@
 
 package org.apache.carbondata.spark.vectorreader;
 
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.spark.util.CarbonScalaUtil;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
@@ -33,9 +36,12 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private boolean filteredRowsExist;
 
+  private DataType dataType;
+
   public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
     this.columnVector = columnVector;
     this.filteredRows = filteredRows;
+    this.dataType = CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
   }
 
   @Override public void putBoolean(int rowId, boolean value) {
@@ -107,16 +113,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
     if (!filteredRows[rowId]) {
-      columnVector.putDecimal(counter++, value, precision);
+      Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value);
+      columnVector.putDecimal(counter++, toDecimal, precision);
     }
   }
 
-  @Override public void putDecimals(int rowId, int count, Decimal value, int precision) {
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        columnVector.putDecimal(counter++, value, precision);
+        Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(value);
+        columnVector.putDecimal(counter++, toDecimal, precision);
       }
       rowId++;
     }
@@ -200,7 +208,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public DataType getType() {
-    return columnVector.dataType();
+    return dataType;
   }
 
   @Override public void setFilteredRowsExist(boolean filteredRowsExist) {