You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:35 UTC

[08/50] [abbrv] carbondata git commit: [CARBONDATA-300] Suppor read batch row in CSDK

 [CARBONDATA-300] Suppor read batch row in CSDK

            1. support read batch row in SDK
            2. support read batch row in CSDK
            3. improve CSDK read performance

This closes #2816


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e8cd72d5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e8cd72d5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e8cd72d5

Branch: refs/heads/branch-1.5
Commit: e8cd72d5b9773473bd94913a810162e275f50b6e
Parents: 6c9b395
Author: xubo245 <xu...@huawei.com>
Authored: Tue Oct 30 10:55:33 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 21 22:39:53 2018 +0530

----------------------------------------------------------------------
 README.md                                       |   2 +-
 .../carbondata/core/scan/result/RowBatch.java   |  13 ++
 .../scan/result/iterator/ChunkRowIterator.java  |  11 +
 docs/csdk-guide.md                              |  37 +++-
 docs/ddl-of-carbondata.md                       |   2 +-
 docs/quick-start-guide.md                       |   2 +-
 .../carbondata/hadoop/CarbonRecordReader.java   |  14 ++
 store/CSDK/CMakeLists.txt                       |  15 ++
 store/CSDK/src/CarbonReader.cpp                 |  43 +++-
 store/CSDK/src/CarbonReader.h                   |  25 +++
 store/CSDK/src/CarbonRow.cpp                    |  16 ++
 store/CSDK/src/CarbonRow.h                      |   7 +-
 store/CSDK/test/main.cpp                        | 217 ++++++++++++++++++-
 .../carbondata/sdk/file/CarbonReader.java       |  51 ++++-
 .../sdk/file/CarbonReaderBuilder.java           |  15 ++
 .../org/apache/carbondata/sdk/file/RowUtil.java |   6 +-
 .../sdk/file/CSVCarbonWriterTest.java           |   4 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 183 +++++++++++++++-
 18 files changed, 629 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index d472e13..8e5a4b0 100644
--- a/README.md
+++ b/README.md
@@ -61,7 +61,7 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
  * [CarbonData Pre-aggregate DataMap](https://github.com/apache/carbondata/blob/master/docs/preaggregate-datamap-guide.md) 
  * [CarbonData Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/timeseries-datamap-guide.md) 
 * [SDK Guide](https://github.com/apache/carbondata/blob/master/docs/sdk-guide.md) 
-* [CSDK Guide](https://github.com/apache/carbondata/blob/master/docs/csdk-guide.md)
+* [C++ SDK Guide](https://github.com/apache/carbondata/blob/master/docs/csdk-guide.md)
 * [Performance Tuning](https://github.com/apache/carbondata/blob/master/docs/performance-tuning.md) 
 * [S3 Storage](https://github.com/apache/carbondata/blob/master/docs/s3-guide.md) 
 * [Carbon as Spark's Datasource](https://github.com/apache/carbondata/blob/master/docs/carbon-as-spark-datasource-guide.md) 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
index c129161..0f11df7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
@@ -100,4 +100,17 @@ public class RowBatch extends CarbonIterator<Object[]> {
     counter++;
     return row;
   }
+
+  /**
+   * read next batch
+   *
+   * @return rows
+   */
+  public List<Object[]> nextBatch() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    counter = counter + rows.size();
+    return rows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
index 0866395..3ce69ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.util.List;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.scan.result.RowBatch;
 
@@ -74,4 +76,13 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
     return currentChunk.next();
   }
 
+  /**
+   * read next batch
+   *
+   * @return list of batch result
+   */
+  public List<Object[]> nextBatch() {
+    return currentChunk.nextBatch();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/docs/csdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/csdk-guide.md b/docs/csdk-guide.md
index cd5851e..b83b06d 100644
--- a/docs/csdk-guide.md
+++ b/docs/csdk-guide.md
@@ -15,20 +15,20 @@
     limitations under the License.
 -->
 
-# CSDK Guide
+# C++ SDK Guide
 
-CarbonData CSDK provides C++ interface to write and read carbon file. 
-CSDK use JNI to invoke java SDK in C++ code.
+CarbonData C++ SDK provides C++ interface to write and read carbon file. 
+C++ SDK use JNI to invoke java SDK in C++ code.
 
 
-# CSDK Reader
-This CSDK reader reads CarbonData file and carbonindex file at a given path.
+# C++ SDK Reader
+This C++ SDK reader reads CarbonData file and carbonindex file at a given path.
 External client can make use of this reader to read CarbonData files in C++ 
 code and without CarbonSession.
 
 
 In the carbon jars package, there exist a carbondata-sdk.jar, 
-including SDK reader for CSDK.
+including SDK reader for C++ SDK.
 ## Quick example
 
 Please find example code at  [main.cpp](https://github.com/apache/carbondata/blob/master/store/CSDK/test/main.cpp) of CSDK module  
@@ -38,6 +38,8 @@ carbon reader and read data.There are some example code of read data from local
 and read data from S3 at main.cpp of CSDK module.  Finally, users need to 
 release the memory and destroy JVM.
 
+C++ SDK support read batch row. User can set batch by using withBatch(int batch) before build, and read batch by using readNextBatchRow().
+
 ## API List
 ### CarbonReader
 ```
@@ -73,6 +75,14 @@ release the memory and destroy JVM.
      **/
     jobject withHadoopConf(int argc, char *argv[]);
 
+   /**
+     * set batch size
+     *
+     * @param batch batch size
+     * @return CarbonReaderBuilder object
+     */
+    void withBatch(int batch);
+
     /**
      * build carbonReader object for reading data
      * it support read data from load disk
@@ -95,6 +105,13 @@ release the memory and destroy JVM.
      jobject readNextRow();
 
     /**
+     * read Next Batch Row
+     *
+     * @return rows
+     */
+    jobjectArray readNextBatchRow();
+
+    /**
      * close the carbon reader
      *
      * @return  boolean value
@@ -103,13 +120,13 @@ release the memory and destroy JVM.
 
 ```
 
-# CSDK Writer
-This CSDK writer writes CarbonData file and carbonindex file at a given path. 
+# C++ SDK Writer
+This C++ SDK writer writes CarbonData file and carbonindex file at a given path. 
 External client can make use of this writer to write CarbonData files in C++ 
-code and without CarbonSession. CSDK already supports S3 and local disk.
+code and without CarbonSession. C++ SDK already supports S3 and local disk.
 
 In the carbon jars package, there exist a carbondata-sdk.jar, 
-including SDK writer for CSDK. 
+including SDK writer for C++ SDK. 
 
 ## Quick example
 Please find example code at  [main.cpp](https://github.com/apache/carbondata/blob/master/store/CSDK/test/main.cpp) of CSDK module  

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/docs/ddl-of-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 1173f38..6ec4929 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -565,7 +565,7 @@ CarbonData DDL statements are documented here,which includes:
   ```
 
   Here writer path will have carbondata and index files.
-  This can be SDK output or CSDK output. Refer [SDK Guide](./sdk-guide.md) and [CSDK Guide](./csdk-guide.md). 
+  This can be SDK output or C++ SDK output. Refer [SDK Guide](./sdk-guide.md) and [C++ SDK Guide](./csdk-guide.md). 
 
   **Note:**
   1. Dropping of the external table should not delete the files present in the location.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/docs/quick-start-guide.md
----------------------------------------------------------------------
diff --git a/docs/quick-start-guide.md b/docs/quick-start-guide.md
index 977235e..a14b1cd 100644
--- a/docs/quick-start-guide.md
+++ b/docs/quick-start-guide.md
@@ -294,7 +294,7 @@ hdfs://<host_name>:port/user/hive/warehouse/carbon.store
 ## Installing and Configuring CarbonData on Presto
 
 **NOTE:** **CarbonData tables cannot be created nor loaded from Presto. User need to create CarbonData Table and load data into it
-either with [Spark](#installing-and-configuring-carbondata-to-run-locally-with-spark-shell) or [SDK](./sdk-guide.md) or [CSDK](./csdk-guide.md).
+either with [Spark](#installing-and-configuring-carbondata-to-run-locally-with-spark-shell) or [SDK](./sdk-guide.md) or [C++ SDK](./csdk-guide.md).
 Once the table is created,it can be queried from Presto.**
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 3ff5093..eca3396 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -131,6 +131,20 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     return readSupport.readRow(carbonIterator.next());
   }
 
+  /**
+   * get batch result
+   *
+   * @return rows
+   */
+  public List<Object[]> getBatchValue() {
+    if (null != inputMetricsStats) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+    List<Object[]> objects = ((ChunkRowIterator) carbonIterator).nextBatch();
+    rowCount += objects.size();
+    return objects;
+  }
+
   @Override public float getProgress() throws IOException, InterruptedException {
     // TODO : Implement it based on total number of rows it is going to retrieve.
     return 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt
index 2d31c75..137e6fd 100644
--- a/store/CSDK/CMakeLists.txt
+++ b/store/CSDK/CMakeLists.txt
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 cmake_minimum_required(VERSION 2.8)
 project(CJDK)
 set(CMAKE_BUILD_TYPE Debug)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/src/CarbonReader.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonReader.cpp b/store/CSDK/src/CarbonReader.cpp
index 57fcda6..8375b73 100644
--- a/store/CSDK/src/CarbonReader.cpp
+++ b/store/CSDK/src/CarbonReader.cpp
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-#include "CarbonReader.h"
 #include <jni.h>
-#include <mach/mach_types.h>
 #include <stdexcept>
+#include <sys/time.h>
+#include "CarbonReader.h"
 
 void CarbonReader::builder(JNIEnv *env, char *path, char *tableName) {
     if (env == NULL) {
@@ -126,6 +126,33 @@ void CarbonReader::withHadoopConf(char *key, char *value) {
     carbonReaderBuilderObject = jniEnv->CallObjectMethodA(carbonReaderBuilderObject, id, args);
 }
 
+void CarbonReader::withBatch(int batch) {
+    checkBuilder();
+    if (batch < 1) {
+        throw std::runtime_error("batch parameter can't be negative and 0.");
+    }
+    jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
+    jmethodID buildID = jniEnv->GetMethodID(carbonReaderBuilderClass, "withBatch",
+        "(I)Lorg/apache/carbondata/sdk/file/CarbonReaderBuilder;");
+    if (buildID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withBatch.");
+    }
+    jvalue args[1];
+    args[0].i = batch;
+    carbonReaderBuilderObject = jniEnv->CallObjectMethodA(carbonReaderBuilderObject, buildID, args);
+}
+
+void CarbonReader::withRowRecordReader() {
+    checkBuilder();
+    jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
+    jmethodID buildID = jniEnv->GetMethodID(carbonReaderBuilderClass, "withRowRecordReader",
+        "()Lorg/apache/carbondata/sdk/file/CarbonReaderBuilder;");
+    if (buildID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withRowRecordReader.");
+    }
+    carbonReaderBuilderObject = jniEnv->CallObjectMethod(carbonReaderBuilderObject, buildID);
+}
+
 jobject CarbonReader::build() {
     checkBuilder();
     jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
@@ -180,6 +207,18 @@ jobject CarbonReader::readNextRow() {
     return result;
 }
 
+jobjectArray CarbonReader::readNextBatchRow() {
+    if (readNextBatchRowID == NULL) {
+        jclass carbonReader = jniEnv->GetObjectClass(carbonReaderObject);
+        readNextBatchRowID = jniEnv->GetMethodID(carbonReader, "readNextBatchRow",
+            "()[Ljava/lang/Object;");
+        if (readNextBatchRowID == NULL) {
+            throw std::runtime_error("Can't find the method in java: readNextBatchRow");
+        }
+    }
+    return (jobjectArray) jniEnv->CallObjectMethod(carbonReaderObject, readNextBatchRowID);
+}
+
 void CarbonReader::close() {
     checkReader();
     jclass carbonReader = jniEnv->GetObjectClass(carbonReaderObject);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/src/CarbonReader.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonReader.h b/store/CSDK/src/CarbonReader.h
index 254062b..20c90bf 100644
--- a/store/CSDK/src/CarbonReader.h
+++ b/store/CSDK/src/CarbonReader.h
@@ -30,6 +30,11 @@ private:
     jmethodID readNextRowID = NULL;
 
     /**
+     * readNextBatchRow jmethodID
+     */
+    jmethodID readNextBatchRowID = NULL;
+
+    /**
      * carbonReaderBuilder object for building carbonReader
      * it can configure some operation
      */
@@ -98,6 +103,19 @@ public:
     void withHadoopConf(char *key, char *value);
 
     /**
+     * set batch size
+     *
+     * @param batch batch size
+     * @return CarbonReaderBuilder object
+     */
+    void withBatch(int batch);
+
+    /**
+     * Configure Row Record Reader for reading.
+     */
+    void withRowRecordReader();
+
+    /**
      * build carbonReader object for reading data
      * it support read data from load disk
      *
@@ -119,6 +137,13 @@ public:
     jobject readNextRow();
 
     /**
+     * read Next Batch Row
+     *
+     * @return rows
+     */
+    jobjectArray readNextBatchRow();
+
+    /**
      * close the carbon reader
      *
      * @return  boolean value

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/src/CarbonRow.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonRow.cpp b/store/CSDK/src/CarbonRow.cpp
index 1c18ae6..f7066ec 100644
--- a/store/CSDK/src/CarbonRow.cpp
+++ b/store/CSDK/src/CarbonRow.cpp
@@ -102,7 +102,14 @@ void CarbonRow::checkOrdinal(int ordinal) {
     }
 }
 
+void CarbonRow::checkCarbonRow() {
+    if (carbonRow == NULL) {
+        throw std::runtime_error("carbonRow is NULL! Please set carbonRow first..");
+    }
+}
+
 short CarbonRow::getShort(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -111,6 +118,7 @@ short CarbonRow::getShort(int ordinal) {
 }
 
 int CarbonRow::getInt(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -119,6 +127,7 @@ int CarbonRow::getInt(int ordinal) {
 }
 
 long CarbonRow::getLong(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -127,6 +136,7 @@ long CarbonRow::getLong(int ordinal) {
 }
 
 double CarbonRow::getDouble(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -136,6 +146,7 @@ double CarbonRow::getDouble(int ordinal) {
 
 
 float CarbonRow::getFloat(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -144,6 +155,7 @@ float CarbonRow::getFloat(int ordinal) {
 }
 
 jboolean CarbonRow::getBoolean(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -152,6 +164,7 @@ jboolean CarbonRow::getBoolean(int ordinal) {
 }
 
 char *CarbonRow::getString(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -163,6 +176,7 @@ char *CarbonRow::getString(int ordinal) {
 }
 
 char *CarbonRow::getDecimal(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -174,6 +188,7 @@ char *CarbonRow::getDecimal(int ordinal) {
 }
 
 char *CarbonRow::getVarchar(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;
@@ -185,6 +200,7 @@ char *CarbonRow::getVarchar(int ordinal) {
 }
 
 jobjectArray CarbonRow::getArray(int ordinal) {
+    checkCarbonRow();
     checkOrdinal(ordinal);
     jvalue args[2];
     args[0].l = carbonRow;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/src/CarbonRow.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonRow.h b/store/CSDK/src/CarbonRow.h
index 2395fa6..3dae5d3 100644
--- a/store/CSDK/src/CarbonRow.h
+++ b/store/CSDK/src/CarbonRow.h
@@ -38,7 +38,7 @@ private:
     /**
      * carbon row data
      */
-    jobject carbonRow;
+    jobject carbonRow = NULL;
 
     /**
      * check ordinal, ordinal can't be negative
@@ -47,6 +47,11 @@ private:
      */
     void checkOrdinal(int ordinal);
 
+    /**
+     * check ordinal, ordinal can't be negative
+     */
+    void checkCarbonRow();
+
 public:
 
     /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/CSDK/test/main.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp
index a3ee97c..5d875e5 100644
--- a/store/CSDK/test/main.cpp
+++ b/store/CSDK/test/main.cpp
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-#include <stdio.h>
+#include <iostream>
 #include <jni.h>
+#include <stdio.h>
 #include <stdlib.h>
-#include <iostream>
+#include <string.h>
+#include <sys/time.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include "../src/CarbonReader.h"
@@ -40,13 +42,15 @@ JavaVM *jvm;
 JNIEnv *initJVM() {
     JNIEnv *env;
     JavaVMInitArgs vm_args;
-    int parNum = 3;
+    int parNum = 2;
     int res;
     JavaVMOption options[parNum];
 
-    options[0].optionString = "-Djava.compiler=NONE";
-    options[1].optionString = "-Djava.class.path=../../sdk/target/carbondata-sdk.jar";
-    options[2].optionString = "-verbose:jni";
+    options[0].optionString = "-Djava.class.path=../../sdk/target/carbondata-sdk.jar";
+    options[1].optionString = "-verbose:jni";                // For debug and check the jni information
+    //    options[2].optionString = "-Xmx12000m";            // change the jvm max memory size
+    //    options[3].optionString = "-Xms5000m";             // change the jvm min memory size
+    //    options[4].optionString = "-Djava.compiler=NONE";  // forbidden JIT
     vm_args.version = JNI_VERSION_1_8;
     vm_args.nOptions = parNum;
     vm_args.options = options;
@@ -186,12 +190,12 @@ bool readSchemaInDataFile(JNIEnv *env, char *dataFilePath) {
  * @param env  jni env
  * @return
  */
-bool readFromLocalWithoutProjection(JNIEnv *env) {
+bool readFromLocalWithoutProjection(JNIEnv *env, char *path) {
     printf("\nRead data from local without projection:\n");
 
     CarbonReader carbonReaderClass;
     try {
-        carbonReaderClass.builder(env, "../../../../resources/carbondata");
+        carbonReaderClass.builder(env, path);
     } catch (runtime_error e) {
         printf("\nget exception fro builder and throw\n");
         throw e;
@@ -205,16 +209,189 @@ bool readFromLocalWithoutProjection(JNIEnv *env) {
 }
 
 /**
+ * test read data by readNextRow method
+ *
+ * @param env  jni env
+ */
+void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int argc, bool useVectorReader) {
+    printf("\nTest next Row Performance, useVectorReader is ");
+    printBoolean(useVectorReader);
+    printf("\n");
+
+    struct timeval start, build, startRead, endBatchRead, endRead;
+    gettimeofday(&start, NULL);
+
+    try {
+        CarbonReader carbonReaderClass;
+
+        carbonReaderClass.builder(env, path);
+        if (argc > 1) {
+            carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
+            carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
+            carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
+        }
+        if (!useVectorReader) {
+            carbonReaderClass.withRowRecordReader();
+        }
+        carbonReaderClass.build();
+
+        gettimeofday(&build, NULL);
+        int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
+        double buildTime = time / 1000000.0;
+        printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
+
+        CarbonRow carbonRow(env);
+        int i = 0;
+
+        gettimeofday(&startRead, NULL);
+        jobject row;
+        while (carbonReaderClass.hasNext()) {
+
+            row = carbonReaderClass.readNextRow();
+
+            i++;
+            if (i > 1 && i % printNum == 0) {
+                gettimeofday(&endBatchRead, NULL);
+
+                time = 1000000 * (endBatchRead.tv_sec - startRead.tv_sec) + endBatchRead.tv_usec - startRead.tv_usec;
+                printf("%d: time is %lf s, speed is %lf records/s  ", i, time / 1000000.0,
+                       printNum / (time / 1000000.0));
+
+                carbonRow.setCarbonRow(row);
+                printf("%s\t", carbonRow.getString(0));
+                printf("%s\t", carbonRow.getString(1));
+                printf("%s\t", carbonRow.getString(2));
+                printf("%s\t", carbonRow.getString(3));
+                printf("%ld\t", carbonRow.getLong(4));
+                printf("%ld\t", carbonRow.getLong(5));
+                printf("\n");
+
+                gettimeofday(&startRead, NULL);
+            }
+            env->DeleteLocalRef(row);
+        }
+
+        gettimeofday(&endRead, NULL);
+
+        time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
+        printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s  ",
+               i, buildTime, time / 1000000.0, i / (time / 1000000.0));
+        carbonReaderClass.close();
+    } catch (jthrowable) {
+        env->ExceptionDescribe();
+    }
+}
+
+/**
+ * test read data by readNextBatchRow method
+ *
+ * @param env  jni env
+ */
+void testReadNextBatchRow(JNIEnv *env, char *path, int batchSize, int printNum, char **argv, int argc,
+                          bool useVectorReader) {
+    printf("\n\nTest next Batch Row Performance:\n");
+    printBoolean(useVectorReader);
+    printf("\n");
+
+    struct timeval start, build, read;
+    gettimeofday(&start, NULL);
+
+    CarbonReader carbonReaderClass;
+
+    carbonReaderClass.builder(env, path);
+    if (argc > 1) {
+        carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
+        carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
+        carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
+    }
+    if (!useVectorReader) {
+        carbonReaderClass.withRowRecordReader();
+    }
+    carbonReaderClass.withBatch(batchSize);
+    try {
+        carbonReaderClass.build();
+    } catch (jthrowable e) {
+        env->ExceptionDescribe();
+    }
+
+    gettimeofday(&build, NULL);
+    int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
+    double buildTime = time / 1000000.0;
+    printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
+
+    CarbonRow carbonRow(env);
+    int i = 0;
+    struct timeval startHasNext, startReadNextBatchRow, endReadNextBatchRow, endRead;
+    gettimeofday(&startHasNext, NULL);
+
+    while (carbonReaderClass.hasNext()) {
+
+        gettimeofday(&startReadNextBatchRow, NULL);
+        jobjectArray batch = carbonReaderClass.readNextBatchRow();
+        if (env->ExceptionCheck()) {
+            env->ExceptionDescribe();
+        }
+        gettimeofday(&endReadNextBatchRow, NULL);
+
+        jsize length = env->GetArrayLength(batch);
+        if (i + length > printNum - 1) {
+            for (int j = 0; j < length; j++) {
+                i++;
+                jobject row = env->GetObjectArrayElement(batch, j);
+                carbonRow.setCarbonRow(row);
+                carbonRow.getString(0);
+                carbonRow.getString(1);
+                carbonRow.getString(2);
+                carbonRow.getString(3);
+                carbonRow.getLong(4);
+                carbonRow.getLong(5);
+                if (i > 1 && i % printNum == 0) {
+                    gettimeofday(&read, NULL);
+
+                    double hasNextTime = 1000000 * (startReadNextBatchRow.tv_sec - startHasNext.tv_sec) +
+                                         startReadNextBatchRow.tv_usec - startHasNext.tv_usec;
+
+                    double readNextBatchTime = 1000000 * (endReadNextBatchRow.tv_sec - startReadNextBatchRow.tv_sec) +
+                                               endReadNextBatchRow.tv_usec - startReadNextBatchRow.tv_usec;
+
+                    time = 1000000 * (read.tv_sec - startHasNext.tv_sec) + read.tv_usec - startHasNext.tv_usec;
+                    printf("%d: time is %lf s, speed is %lf records/s, hasNext time is %lf s,readNextBatchRow time is %lf s ",
+                           i, time / 1000000.0, printNum / (time / 1000000.0), hasNextTime / 1000000.0,
+                           readNextBatchTime / 1000000.0);
+                    gettimeofday(&startHasNext, NULL);
+                    printf("%s\t", carbonRow.getString(0));
+                    printf("%s\t", carbonRow.getString(1));
+                    printf("%s\t", carbonRow.getString(2));
+                    printf("%s\t", carbonRow.getString(3));
+                    printf("%ld\t", carbonRow.getLong(4));
+                    printf("%ld\t", carbonRow.getLong(5));
+                    printf("\n");
+                }
+                env->DeleteLocalRef(row);
+            }
+        } else {
+            i = i + length;
+        }
+        env->DeleteLocalRef(batch);
+    }
+    gettimeofday(&endRead, NULL);
+    time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
+    printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s  ",
+           i, buildTime, time / 1000000.0, i / (time / 1000000.0));
+    carbonReaderClass.close();
+}
+
+/**
  * test read data from local disk
  *
  * @param env  jni env
  * @return
  */
-bool readFromLocal(JNIEnv *env) {
+bool readFromLocalWithProjection(JNIEnv *env, char *path) {
     printf("\nRead data from local:\n");
 
     CarbonReader reader;
-    reader.builder(env, "../../../../resources/carbondata", "test");
+    reader.builder(env, path, "test");
 
     char *argv[12];
     argv[0] = "stringField";
@@ -271,6 +448,7 @@ bool tryCatchException(JNIEnv *env) {
         carbonReaderClass.build();
     } catch (jthrowable e) {
         env->ExceptionDescribe();
+        env->ExceptionClear();
     }
     printf("\nfinished handle exception\n");
 }
@@ -455,20 +633,35 @@ int main(int argc, char *argv[]) {
     char *S3WritePath = "s3a://sdk/WriterOutput/carbondata2";
     char *S3ReadPath = "s3a://sdk/WriterOutput/carbondata";
 
+    char *smallFilePath = "../../../../resources/carbondata";
+    char *path = "../../../../../../../Downloads/carbon-data-big/dir2";
+    char *S3Path = "s3a://sdk/ges/i400bs128";
+
     if (argc > 3) {
         // TODO: need support read schema from S3 in the future
         testWriteData(env, S3WritePath, 4, argv);
         readFromS3(env, S3ReadPath, argv);
+        testReadNextRow(env, S3Path, 100000, argv, 4, false);
+        testReadNextRow(env, S3Path, 100000, argv, 4, true);
+        testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, false);
+        testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, true);
     } else {
         tryCatchException(env);
         char *indexFilePath = argv[1];
         char *dataFilePath = argv[2];
         testCarbonProperties(env);
-        readFromLocalWithoutProjection(env);
         testWriteData(env, "./data", 1, argv);
-        readFromLocal(env);
         readSchemaInIndexFile(env, indexFilePath);
         readSchemaInDataFile(env, dataFilePath);
+        testWriteData(env, "./data", 1, argv);
+        readFromLocalWithoutProjection(env, smallFilePath);
+        readFromLocalWithProjection(env, smallFilePath);
+        int batch = 32000;
+        int printNum = 32000;
+        testReadNextRow(env, path, printNum, argv, 0, true);
+        testReadNextRow(env, path, printNum, argv, 0, false);
+        testReadNextBatchRow(env, path, batch, printNum, argv, 0, true);
+        testReadNextBatchRow(env, path, batch, printNum, argv, 0, false);
     }
     (jvm)->DestroyJavaVM();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 1a55a2e..ebe0651 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -24,15 +24,19 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTaskInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
 import org.apache.hadoop.mapreduce.RecordReader;
 
 
 /**
- * Reader for carbondata file
+ * Reader for CarbonData file
  */
 @InterfaceAudience.User
 @InterfaceStability.Evolving
@@ -47,6 +51,11 @@ public class CarbonReader<T> {
   private boolean initialise;
 
   /**
+   * save batch rows data
+   */
+  private Object[] batchRows;
+
+  /**
    * Call {@link #builder(String)} to construct an instance
    */
   CarbonReader(List<RecordReader<Void, T>> readers) {
@@ -92,6 +101,43 @@ public class CarbonReader<T> {
   }
 
   /**
+   * Read and return next batch row objects
+   */
+  public Object[] readNextBatchRow() throws Exception {
+    validateReader();
+    if (currentReader instanceof CarbonRecordReader) {
+      List<Object> batchValue = ((CarbonRecordReader) currentReader).getBatchValue();
+      if (batchValue == null) {
+        return null;
+      } else {
+        return batchValue.toArray();
+      }
+    } else if (currentReader instanceof CarbonVectorizedRecordReader) {
+      int batch = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE,
+              String.valueOf(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT)));
+      batchRows = new Object[batch];
+      int sum = 0;
+      for (int i = 0; i < batch; i++) {
+        batchRows[i] = currentReader.getCurrentValue();
+        sum++;
+        if (i != batch - 1) {
+          if (!hasNext()) {
+            Object[] lessBatch = new Object[sum];
+            for (int j = 0; j < sum; j++) {
+              lessBatch[j] = batchRows[j];
+            }
+            return lessBatch;
+          }
+        }
+      }
+      return batchRows;
+    } else {
+      throw new Exception("Didn't support read next batch row by this reader.");
+    }
+  }
+
+  /**
    * Return a new {@link CarbonReaderBuilder} instance
    *
    * @param tablePath table store path
@@ -173,6 +219,9 @@ public class CarbonReader<T> {
    */
   public void close() throws IOException {
     validateReader();
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE,
+            String.valueOf(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT));
     this.currentReader.close();
     this.initialise = false;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index b04f0c5..2b76d7b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -24,12 +24,14 @@ import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
@@ -107,6 +109,19 @@ public class CarbonReaderBuilder {
   }
 
   /**
+   * set read batch size before build
+   *
+   * @param batch batch size
+   * @return updated CarbonReaderBuilder
+   */
+  public CarbonReaderBuilder withBatch(int batch) {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE,
+            String.valueOf(batch));
+    return this;
+  }
+
+  /**
    * configure hadoop configuration with key value
    *
    * @param key   key word

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
index 82b3904..b7a594f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
@@ -119,8 +119,8 @@ public class RowUtil implements Serializable {
 
   /**
    * get varchar data type data by ordinal
-   * This is for CSDK
-   * JNI don't support varchar, so carbon convert varchar to string
+   * This is for C++ SDK
+   * JNI don't support varchar, so carbon convert decimal to string
    *
    * @param data carbon row data
    * @param ordinal the data index of Row
@@ -132,7 +132,7 @@ public class RowUtil implements Serializable {
 
   /**
    * get decimal data type data by ordinal
-   * This is for CSDK
+   * This is for C++ SDK
    * JNI don't support Decimal, so carbon convert decimal to string
    *
    * @param data carbon row data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 26078ed..d957ff6 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -63,7 +63,9 @@ public class CSVCarbonWriterTest {
       assert (false);
     }
     CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, path);
+        .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, path)
+        .addProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE,
+            String.valueOf(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT));
     assert (TestUtil.cleanMdtFile());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e8cd72d5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 9fb34d4..d79a1ad 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -1486,7 +1486,9 @@ public class CarbonReaderTest extends TestCase {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .withLoadOptions(map)
-          .withCsvInput(new Schema(fields)).build();
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
 
       for (int i = 0; i < 10; i++) {
         String[] row2 = new String[]{
@@ -1844,4 +1846,183 @@ public class CarbonReaderTest extends TestCase {
     }
   }
 
+  @Test
+  public void testReadNextBatchRow() {
+    String path = "./carbondata";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[12];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+      fields[11] = new Field("floatField", DataTypes.FLOAT);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i % 10000),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            "12.345",
+            "varchar",
+            "Hello#World#From#Carbon",
+            "1.23"
+        };
+        writer.write(row2);
+      }
+      writer.close();
+
+      // Read data
+      int batchSize =4;
+      CarbonReader reader = CarbonReader
+          .builder(path, "_temp")
+          .withBatch(4)
+          .build();
+
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] batch = reader.readNextBatchRow();
+        Assert.assertTrue(batch.length <= batchSize);
+
+        for (int j = 0; j < batch.length; j++) {
+
+          Object[] data = (Object[]) batch[j];
+          assert (RowUtil.getString(data, 0).equals("robot" + i));
+          assertEquals(RowUtil.getInt(data, 1), 17957);
+          assert (RowUtil.getVarchar(data, 3).equals("varchar"));
+          Object[] arr = RowUtil.getArray(data, 4);
+          assert (arr[0].equals("Hello"));
+          assert (arr[1].equals("World"));
+          assert (arr[2].equals("From"));
+          assert (arr[3].equals("Carbon"));
+          assertEquals(RowUtil.getShort(data, 5), i);
+          assertEquals(RowUtil.getInt(data, 6), i);
+          assertEquals(RowUtil.getLong(data, 7), Long.MAX_VALUE - i);
+          assertEquals(RowUtil.getDouble(data, 8), ((double) i) / 2);
+          assert (RowUtil.getBoolean(data, 9));
+          assert (RowUtil.getDecimal(data, 10).equals("12.35"));
+          assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
+          i++;
+        }
+        System.out.println("batch is " + i);
+      }
+      reader.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+  @Test
+  public void testReadNextBatchRowWithVectorReader() {
+    String path = "./carbondata";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[11];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      // Vector don't support complex data type
+      // fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+      fields[10] = new Field("floatField", DataTypes.FLOAT);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i % 10000),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            "12.345",
+            "varchar",
+            "1.23"
+        };
+        writer.write(row2);
+      }
+      writer.close();
+
+      // Read data
+      int batchSize =4;
+      CarbonReader reader = CarbonReader
+          .builder(path, "_temp")
+          .withBatch(4)
+          .build();
+
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] batch = reader.readNextBatchRow();
+        Assert.assertTrue(batch.length <= batchSize);
+
+        for (int j = 0; j < batch.length; j++) {
+
+          Object[] data = (Object[]) batch[j];
+          assert (RowUtil.getString(data, 0).equals("robot" + i));
+          assertEquals(RowUtil.getInt(data, 1), 17957);
+          assert (RowUtil.getVarchar(data, 3).equals("varchar"));
+          assertEquals(RowUtil.getShort(data, 4), i);
+          assertEquals(RowUtil.getInt(data, 5), i);
+          assertEquals(RowUtil.getLong(data, 6), Long.MAX_VALUE - i);
+          assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
+          assert (RowUtil.getDecimal(data, 9).equals("12.35"));
+          assertEquals(RowUtil.getFloat(data, 10), (float) 1.23);
+          i++;
+        }
+        System.out.println("batch is " + i);
+      }
+      reader.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
 }