You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/11/05 06:14:34 UTC

carbondata git commit: [CARBONDATA-3057] Implement VectorizedReader for SDK Reader

Repository: carbondata
Updated Branches:
  refs/heads/master 5a0bc6e71 -> 63a28a951


[CARBONDATA-3057] Implement VectorizedReader for SDK Reader

1. Added carbondata file listing for getting splits to avoid block/blocklet datamap
loading when filter expressions is not provided by the user

2. Implemented Vectorized reader, exposes a property to switch between record reader/vector reader.

This closes #2869


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/63a28a95
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/63a28a95
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/63a28a95

Branch: refs/heads/master
Commit: 63a28a951ed552680da1a5047f5937fb90a8d76d
Parents: 5a0bc6e
Author: kunal642 <ku...@gmail.com>
Authored: Fri Oct 26 11:43:22 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Mon Nov 5 11:49:24 2018 +0530

----------------------------------------------------------------------
 ...feVariableLengthDimensionDataChunkStore.java |   5 +-
 .../filesystem/AbstractDFSCarbonFile.java       |  26 +++
 .../core/datastore/filesystem/CarbonFile.java   |   8 +
 .../datastore/filesystem/LocalCarbonFile.java   |  23 ++
 .../encoding/compress/DirectCompressCodec.java  |   7 +-
 .../core/metadata/datatype/DecimalType.java     |   2 +-
 .../core/metadata/datatype/StructType.java      |   2 +-
 .../vector/impl/CarbonColumnVectorImpl.java     |  18 +-
 docs/sdk-guide.md                               |   8 +
 .../carbondata/hadoop/CarbonRecordReader.java   |  15 ++
 .../hadoop/api/CarbonFileInputFormat.java       |  55 ++++-
 .../util/CarbonVectorizedRecordReader.java      | 211 +++++++++++++++++++
 .../sdk/file/CarbonReaderBuilder.java           |  36 +++-
 .../sdk/file/CSVCarbonWriterTest.java           |   4 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 140 ++++++++++--
 15 files changed, 519 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 2873eed..01db383 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -163,13 +163,14 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
     }
     DataType dt = vector.getType();
 
-    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+    if (((!(dt == DataTypes.STRING) && !(dt == DataTypes.VARCHAR)) && length == 0)
+        || ByteUtil.UnsafeComparer.INSTANCE
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
             CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentDataOffset,
             length)) {
       vector.putNull(vectorRow);
     } else {
-      if (dt == DataTypes.STRING) {
+      if (dt == DataTypes.STRING || dt == DataTypes.VARCHAR) {
         vector.putByteArray(vectorRow, currentDataOffset, length, data);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 24efb70..d56caac 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -524,6 +524,27 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return getFiles(listStatus);
   }
 
+  /**
+   * Method used to list files recursively and apply file filter on the result.
+   *
+   */
+  @Override
+  public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    if (null != fileStatus && fileStatus.isDirectory()) {
+      RemoteIterator<LocatedFileStatus> listStatus = fs.listFiles(fileStatus.getPath(), recursive);
+      while (listStatus.hasNext()) {
+        LocatedFileStatus locatedFileStatus = listStatus.next();
+        CarbonFile carbonFile = FileFactory.getCarbonFile(locatedFileStatus.getPath().toString());
+        if (fileFilter.accept(carbonFile)) {
+          carbonFiles.add(carbonFile);
+        }
+      }
+    }
+    return carbonFiles;
+  }
+
   @Override
   public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
     if (null != fileStatus && fileStatus.isDirectory()) {
@@ -584,4 +605,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
     return fs.getDefaultReplication(path);
   }
+
+  @Override
+  public long getLength() {
+    return fileStatus.getLen();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 8044817..ce50259 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -38,6 +38,8 @@ public interface CarbonFile {
 
   List<CarbonFile> listFiles(Boolean recurssive) throws IOException;
 
+  List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) throws IOException;
+
   /**
    * It returns list of files with location details.
    * @return
@@ -179,4 +181,10 @@ public interface CarbonFile {
    * @throws IOException if error occurs
    */
   short getDefaultReplication(String filePath) throws IOException;
+
+  /**
+   * Get the length of this file, in bytes.
+   * @return the length of this file, in bytes.
+   */
+  long getLength();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 620a19c..98d61f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -183,6 +183,25 @@ public class LocalCarbonFile implements CarbonFile {
     return carbonFiles;
   }
 
+  @Override public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter)
+      throws IOException {
+    if (!file.isDirectory()) {
+      return new ArrayList<CarbonFile>();
+    }
+    Collection<File> fileCollection = FileUtils.listFiles(file, null, recursive);
+    if (fileCollection == null) {
+      return new ArrayList<CarbonFile>();
+    }
+    List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>();
+    for (File file : fileCollection) {
+      CarbonFile carbonFile = new LocalCarbonFile(file);
+      if (fileFilter.accept(carbonFile)) {
+        carbonFiles.add(carbonFile);
+      }
+    }
+    return carbonFiles;
+  }
+
   @Override public boolean createNewFile() {
     try {
       return file.createNewFile();
@@ -479,4 +498,8 @@ public class LocalCarbonFile implements CarbonFile {
   public short getDefaultReplication(String filePath) throws IOException {
     return 1;
   }
+
+  @Override public long getLength() {
+    return file.length();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 1825850..fd94344 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -329,9 +329,6 @@ public class DirectCompressCodec implements ColumnPageCodec {
       }  else if (pageDataType == DataTypes.LONG) {
         long[] longData = columnPage.getLongPage();
         if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < pageSize; i++) {
-            vector.putLong(i, longData[i]);
-          }
           vector.putLongs(0, pageSize, longData, 0);
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
           for (int i = 0; i < pageSize; i++) {
@@ -347,9 +344,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
             columnPage.getNullBits());
       } else if (vectorDataType == DataTypes.FLOAT) {
         float[] floatPage = columnPage.getFloatPage();
-        for (int i = 0; i < pageSize; i++) {
-          vector.putFloats(0, pageSize, floatPage, 0);
-        }
+        vector.putFloats(0, pageSize, floatPage, 0);
       } else {
         double[] doubleData = columnPage.getDoublePage();
         vector.putDoubles(0, pageSize, doubleData, 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
index a7f7a4e..b4bc20c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
@@ -23,7 +23,7 @@ public class DecimalType extends DataType {
   private int scale;
 
   // create a decimal type object with specified precision and scale
-  DecimalType(int precision, int scale) {
+  public DecimalType(int precision, int scale) {
     super(DataTypes.DECIMAL_TYPE_ID, 8, "DECIMAL", -1);
     this.precision = precision;
     this.scale = scale;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
index e8559b2..4725fb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
@@ -23,7 +23,7 @@ public class StructType extends DataType {
 
   private List<StructField> fields;
 
-  StructType(List<StructField> fields) {
+  public StructType(List<StructField> fields) {
     super(DataTypes.STRUCT_TYPE_ID, 10, "STRUCT", -1);
     this.fields = fields;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index a11682b..f89ad9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -70,7 +70,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       byteArr = new byte[batchSize];
     } else if (dataType == DataTypes.SHORT) {
       shorts = new short[batchSize];
-    } else if (dataType == DataTypes.INT) {
+    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
       ints = new int[batchSize];
     } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       longs = new long[batchSize];
@@ -80,7 +80,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       doubles = new double[batchSize];
     } else if (dataType instanceof DecimalType) {
       decimals = new BigDecimal[batchSize];
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+        || dataType == DataTypes.VARCHAR) {
       dictionaryVector = new CarbonColumnVectorImpl(batchSize, DataTypes.INT);
       bytes = new byte[batchSize][];
     } else {
@@ -207,7 +208,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       return  byteArr[rowId];
     } else if (dataType == DataTypes.SHORT) {
       return shorts[rowId];
-    } else if (dataType == DataTypes.INT) {
+    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
       return ints[rowId];
     } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       return longs[rowId];
@@ -217,7 +218,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       return doubles[rowId];
     } else if (dataType instanceof DecimalType) {
       return decimals[rowId];
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+        || dataType == DataTypes.VARCHAR) {
       if (null != carbonDictionary) {
         int dictKey = (Integer) dictionaryVector.getData(rowId);
         return carbonDictionary.getDictionaryValue(dictKey);
@@ -243,7 +245,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       return doubles;
     } else if (dataType instanceof DecimalType) {
       return decimals;
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY || dataType ==
+        DataTypes.VARCHAR) {
       if (null != carbonDictionary) {
         return ints;
       }
@@ -259,7 +262,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       Arrays.fill(byteArr, (byte) 0);
     } else if (dataType == DataTypes.SHORT) {
       Arrays.fill(shorts, (short) 0);
-    } else if (dataType == DataTypes.INT) {
+    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
       Arrays.fill(ints, 0);
     } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       Arrays.fill(longs, 0);
@@ -269,7 +272,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
       Arrays.fill(doubles, 0);
     } else if (dataType instanceof DecimalType) {
       Arrays.fill(decimals, null);
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
+        || dataType == DataTypes.VARCHAR) {
       Arrays.fill(bytes, null);
       this.dictionaryVector.reset();
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c286d81..8988dc3 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -456,6 +456,14 @@ public CarbonWriterBuilder writtenBy(String appName) {
 public CarbonWriter build() throws IOException, InvalidLoadOptionException;
 ```
 
+```
+ /**
+   * Configure Row Record Reader for reading.
+   *
+   */
+  public CarbonReaderBuilder withRowRecordReader()
+```
+
 ### Class org.apache.carbondata.sdk.file.CarbonWriter
 ```
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index d447320..3ff5093 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.hadoop;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -24,7 +25,9 @@ import java.util.Map;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -78,6 +81,18 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       splitList = new ArrayList<>(1);
+      String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+      // BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set
+      // where multiple threads are able to read small set of files to calculate footer instead
+      // of the main thread setting this for all the files.
+      if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+        FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
+            context.getConfiguration());
+        ByteBuffer buffer = reader
+            .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath), inputSplit.getLength() - 8,
+                8);
+        ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+      }
       splitList.add((CarbonInputSplit) inputSplit);
     } else if (inputSplit instanceof CarbonMultiBlockSplit) {
       // contains multiple blocks, this is an optimization for concurrent query.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index fcfb346..8b43190 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -21,14 +21,20 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -36,11 +42,13 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 
@@ -59,6 +67,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
+
   public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
     CarbonTable carbonTableTemp;
     if (carbonTable == null) {
@@ -145,9 +154,35 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
           externalTableSegments.add(seg);
         }
       }
-      // do block filtering and get split
-      List<InputSplit> splits =
-          getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
+      List<InputSplit> splits = new ArrayList<>();
+      boolean useBlockDataMap = job.getConfiguration().getBoolean("filter_blocks", true);
+      // useBlockDataMap would be false in case of SDK when user has not provided any filter, In
+      // this case we dont want to load block/blocklet datamap. It would be true in all other
+      // scenarios
+      if (useBlockDataMap) {
+        // do block filtering and get split
+        splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
+      } else {
+        for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
+          // Segment id is set to null because SDK does not write carbondata files with respect
+          // to segments. So no specific name is present for this load.
+          CarbonInputSplit split =
+              new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0,
+                  carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
+          split.setVersion(ColumnarFormatVersion.V3);
+          BlockletDetailInfo info = new BlockletDetailInfo();
+          split.setDetailInfo(info);
+          info.setBlockSize(carbonFile.getLength());
+          info.setVersionNumber(split.getVersion().number());
+          info.setUseMinMaxForPruning(false);
+          splits.add(split);
+        }
+        Collections.sort(splits, new Comparator<InputSplit>() {
+          @Override public int compare(InputSplit o1, InputSplit o2) {
+            return ((CarbonInputSplit) o1).getPath().compareTo(((CarbonInputSplit) o2).getPath());
+          }
+        });
+      }
       if (getColumnProjection(job.getConfiguration()) == null) {
         // If the user projection is empty, use default all columns as projections.
         // All column name will be filled inside getSplits, so can update only here.
@@ -159,6 +194,20 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     return null;
   }
 
+  private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
+    List<CarbonFile> carbonFiles;
+    try {
+      carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().contains(CarbonTablePath.CARBON_DATA_EXT);
+        }
+      });
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return carbonFiles;
+  }
+
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
new file mode 100644
index 0000000..9d3d7d6
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * A specialized RecordReader that reads into CarbonColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+
+  private CarbonColumnarBatch carbonColumnarBatch;
+
+  private QueryExecutor queryExecutor;
+
+  private int batchIdx = 0;
+
+  private int numBatched = 0;
+
+  private AbstractDetailQueryResultIterator iterator;
+
+  private QueryModel queryModel;
+
+  public CarbonVectorizedRecordReader(QueryModel queryModel) {
+    this.queryModel = queryModel;
+  }
+
+  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      // Read the footer offset and set.
+      String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+      if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
+        FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
+            taskAttemptContext.getConfiguration());
+        ByteBuffer buffer = reader
+            .readByteBuffer(FileFactory.getUpdatedFilePath(splitPath),
+                ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8,
+                8);
+        ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+      }
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    queryModel.setVectorReader(true);
+    try {
+      queryExecutor =
+          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
+      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+      initBatch();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e);
+      throw new InterruptedException(e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+
+  private boolean nextBatch() {
+    carbonColumnarBatch.reset();
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(carbonColumnarBatch);
+      numBatched = carbonColumnarBatch.getActualSize();
+      batchIdx = 0;
+      return true;
+    }
+    return false;
+  }
+
+  private void initBatch() {
+    if (carbonColumnarBatch == null) {
+      List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+      List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+      StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+      for (ProjectionDimension dim : queryDimension) {
+        fields[dim.getOrdinal()] =
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+      }
+      for (ProjectionMeasure msr : queryMeasures) {
+        DataType dataType = msr.getMeasure().getDataType();
+        if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
+            || dataType == DataTypes.INT || dataType == DataTypes.LONG
+            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
+          fields[msr.getOrdinal()] =
+              new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+        } else if (DataTypes.isDecimal(dataType)) {
+          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+              new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()));
+        } else {
+          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
+        }
+      }
+      CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+      for (int i = 0; i < fields.length; i++) {
+        vectors[i] = new CarbonColumnVectorImpl(
+            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+            fields[i].getDataType());
+      }
+      carbonColumnarBatch = new CarbonColumnarBatch(vectors,
+          CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+          new boolean[] {});
+    }
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException, InterruptedException {
+    rowCount += 1;
+    Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
+    for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
+      if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
+          || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
+        byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+        row[i] = ByteUtil.toString(data, 0, data.length);
+      } else if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.BOOLEAN) {
+        byte data = (byte) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+        row[i] = ByteUtil.toBoolean(data);
+      } else {
+        row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+      }
+    }
+    return row;
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Operation not allowed on CarbonVectorizedReader");
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    // TODO : Implement it based on total number of rows it is going to retrieve.
+    return 0;
+  }
+
+  @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    if (carbonColumnarBatch != null) {
+      carbonColumnarBatch = null;
+    }
+    if (iterator != null) {
+      iterator.close();
+    }
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index cc3fd72..b04f0c5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -28,10 +28,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -51,6 +54,7 @@ public class CarbonReaderBuilder {
   private Expression filterExpression;
   private String tableName;
   private Configuration hadoopConf;
+  private boolean useVectorReader = true;
 
   /**
    * Construct a CarbonReaderBuilder with table path and table name
@@ -119,6 +123,15 @@ public class CarbonReaderBuilder {
   }
 
   /**
+   * Configure Row Record Reader for reading.
+   *
+   */
+  public CarbonReaderBuilder withRowRecordReader() {
+    this.useVectorReader = false;
+    return this;
+  }
+
+  /**
    * Build CarbonReader
    *
    * @param <T>
@@ -158,14 +171,31 @@ public class CarbonReaderBuilder {
     }
 
     try {
-      final List<InputSplit> splits =
-          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
 
+      if (filterExpression == null) {
+        job.getConfiguration().set("filter_blocks", "false");
+      }
+      List<InputSplit> splits =
+          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
       List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
       for (InputSplit split : splits) {
         TaskAttemptContextImpl attempt =
             new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-        RecordReader reader = format.createRecordReader(split, attempt);
+        RecordReader reader;
+        QueryModel queryModel = format.createQueryModel(split, attempt);
+        boolean hasComplex = false;
+        for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
+          if (projectionDimension.getDimension().isComplex()) {
+            hasComplex = true;
+            break;
+          }
+        }
+        if (useVectorReader && !hasComplex) {
+          queryModel.setDirectVectorFill(filterExpression == null);
+          reader = new CarbonVectorizedRecordReader(queryModel);
+        } else {
+          reader = format.createRecordReader(split, attempt);
+        }
         try {
           reader.initialize(split, attempt);
           readers.add(reader);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ed697d7..26078ed 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -454,7 +454,8 @@ public class CSVCarbonWriterTest {
       writer.close();
       CarbonReader carbonReader =
           new CarbonReaderBuilder(path, "table1").build();
-      for (int i = 0; i < 15; i++) {
+      int i = 0;
+      while(carbonReader.hasNext()) {
         Object[] actualRow = (Object[]) carbonReader.readNextRow();
         String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + "." + i };
         for (int j = 0; j < 3; j++) {
@@ -462,6 +463,7 @@ public class CSVCarbonWriterTest {
         }
         assert(actualRow[1] instanceof Byte);
         assert(actualRow[2] instanceof Float);
+        i++;
       }
       carbonReader.close();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63a28a95/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 74428f0..9fb34d4 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -28,7 +28,9 @@ import org.apache.log4j.Logger;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.common.logging.impl.Audit;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -63,7 +65,8 @@ public class CarbonReaderTest extends TestCase {
   public void testWriteAndReadFiles() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -116,7 +119,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     String path1 = path + "/0testdir";
     String path2 = path + "/testdir";
 
@@ -162,7 +166,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -198,7 +203,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -240,7 +246,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -282,7 +289,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalGreaterThan() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -324,7 +332,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalLessThan() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -366,7 +375,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalNotEqual() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -415,7 +425,8 @@ public class CarbonReaderTest extends TestCase {
     fields[2] = new Field("doubleField", DataTypes.DOUBLE);
 
     TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
     LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
         new LiteralExpression("13.5", DataTypes.DOUBLE));
@@ -450,7 +461,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadWithFilterOfNonTransactionalNotIn() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[3];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -494,7 +506,10 @@ public class CarbonReaderTest extends TestCase {
     String path2 = "./testWriteFiles2";
     FileUtils.deleteDirectory(new File(path1));
     FileUtils.deleteDirectory(new File(path2));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path1));
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path2));
     Field[] fields = new Field[] { new Field("c1", "string"),
          new Field("c2", "int") };
     Schema schema = new Schema(fields);
@@ -551,7 +566,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadColumnTwice() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -595,7 +611,8 @@ public class CarbonReaderTest extends TestCase {
   public void readFilesParallel() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -630,7 +647,8 @@ public class CarbonReaderTest extends TestCase {
   public void testReadAfterClose() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -670,7 +688,8 @@ public class CarbonReaderTest extends TestCase {
   public void testWriteAndReadFilesWithoutTableName() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -708,7 +727,8 @@ public class CarbonReaderTest extends TestCase {
   public void testWriteAndReadFilesWithoutTableName2() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
-
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(path));
     Field[] fields = new Field[2];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
@@ -902,7 +922,7 @@ public class CarbonReaderTest extends TestCase {
     Assert.assertTrue(dataFiles.length > 0);
 
     CarbonReader reader = CarbonReader.builder(path, "_temp")
-        
+
         .projection(new String[]{
             "stringField"
             , "shortField"
@@ -1701,6 +1721,7 @@ public class CarbonReaderTest extends TestCase {
       // Read data
       CarbonReader reader = CarbonReader
           .builder(path, "_temp")
+          .withRowRecordReader()
           .build();
 
       int i = 0;
@@ -1724,6 +1745,91 @@ public class CarbonReaderTest extends TestCase {
         assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
         i++;
       }
+      assert  (i == 10);
+      reader.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test
+  public void testVectorReader() {
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[12];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[10] = new Field("byteField", DataTypes.BYTE);
+      fields[11] = new Field("floatField", DataTypes.FLOAT);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i % 10000),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            "12.345",
+            "varchar",
+            String.valueOf(i),
+            "1.23"
+        };
+        writer.write(row2);
+      }
+      writer.close();
+
+      // Read data
+      CarbonReader reader = CarbonReader
+          .builder(path, "_temp")
+          .build();
+
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] data = (Object[]) reader.readNextRow();
+
+        assert (RowUtil.getString(data, 0).equals("robot" + i));
+        assertEquals(RowUtil.getShort(data, 4), i);
+        assertEquals(RowUtil.getInt(data, 5), i);
+        assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
+        assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
+        assert (RowUtil.getBoolean(data, 8));
+        assertEquals(RowUtil.getInt(data, 1), 17957);
+        assert (RowUtil.getDecimal(data, 9).equals("12.35"));
+        assert (RowUtil.getString(data, 3).equals("varchar"));
+        assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i)));
+        assertEquals(RowUtil.getFloat(data, 11), new Float("1.23"));
+        i++;
+      }
+      assert(i==10);
       reader.close();
     } catch (Throwable e) {
       e.printStackTrace();