You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/19 15:31:47 UTC

[carbondata] 02/02: [CARBONDATA-3365] Integrate apache arrow vector filling to carbon SDK

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit f5cc9b748830c0251ee70a86aa62d8533762bb87
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Feb 26 20:03:41 2019 +0800

    [CARBONDATA-3365] Integrate apache arrow vector filling to carbon SDK
    
    So, By integrating carbon to support filling arrow vector, contents read by
    carbondata files can be used for analytics in any programming language. say
    arrow vector filled from carbon java SDK can be read by python, c, c++ and
    many other languages supported by arrow.
    This will also increase the scope for carbondata use-cases and carbondata
    can be used for various applications as arrow is integrated already with
    many query engines.
    
    This closes #3193
---
 .../carbondata/examples/CarbonSessionExample.scala | 180 ++++++++++-----------
 .../hadoop/api/CarbonFileInputFormat.java          |  20 +--
 .../carbondata/hadoop/api/CarbonInputFormat.java   |   3 -
 store/sdk/pom.xml                                  |  31 +++-
 .../carbondata/sdk/file/ArrowCarbonReader.java     | 106 ++++++++++++
 .../apache/carbondata/sdk/file/CarbonReader.java   |  10 --
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  67 +++-----
 .../carbondata/sdk/file/CarbonSchemaReader.java    |  16 ++
 .../carbondata/sdk/file/arrow/ArrowConverter.java  |  80 +++++++--
 .../sdk/file/arrow/ArrowFieldWriter.java           |  45 +++++-
 .../carbondata/sdk/file/arrow/ArrowUtils.java      |  29 ++--
 .../carbondata/sdk/file/arrow/ArrowWriter.java     |   6 +
 .../file/arrow/ExtendedByteArrayOutputStream.java  |  39 +++++
 .../carbondata/sdk/file/CarbonReaderTest.java      | 135 ++++++++++++++++
 14 files changed, 563 insertions(+), 204 deletions(-)

diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 3aa761e..b6921f2 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,7 +37,7 @@ object CarbonSessionExample {
       s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
 
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
     spark.sparkContext.setLogLevel("INFO")
     exampleBody(spark)
@@ -49,96 +49,92 @@ object CarbonSessionExample {
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
 
-//    spark.sql("DROP TABLE IF EXISTS source")
-//
-//    // Create table
-//    spark.sql(
-//      s"""
-//         | CREATE TABLE source(
-//         | shortField SHORT,
-//         | intField INT,
-//         | bigintField LONG,
-//         | doubleField DOUBLE,
-//         | stringField STRING,
-//         | timestampField TIMESTAMP,
-//         | decimalField DECIMAL(18,2),
-//         | dateField DATE,
-//         | charField CHAR(5),
-//         | floatField FLOAT
-//         | )
-//         | STORED AS carbondata
-//       """.stripMargin)
-//
-//    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-//
-//    // scalastyle:off
-//    spark.sql(
-//      s"""
-//         | LOAD DATA LOCAL INPATH '$path'
-//         | INTO TABLE source
-//         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
-//       """.stripMargin)
-//    // scalastyle:on
-//
-//    spark.sql(
-//      s"""
-//         | SELECT charField, stringField, intField
-//         | FROM source
-//         | WHERE stringfield = 'spark' AND decimalField > 40
-//      """.stripMargin).show()
-//
-//    spark.sql(
-//      s"""
-//         | SELECT *
-//         | FROM source WHERE length(stringField) = 5
-//       """.stripMargin).show()
-//
-//    spark.sql(
-//      s"""
-//         | SELECT *
-//         | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
-//       """.stripMargin).show()
-//
-//    spark.sql("SELECT count(stringField) FROM source").show()
-//
-//    spark.sql(
-//      s"""
-//         | SELECT sum(intField), stringField
-//         | FROM source
-//         | GROUP BY stringField
-//       """.stripMargin).show()
-//
-//    spark.sql(
-//      s"""
-//         | SELECT t1.*, t2.*
-//         | FROM source t1, source t2
-//         | WHERE t1.stringField = t2.stringField
-//      """.stripMargin).show()
-//
-//    spark.sql(
-//      s"""
-//         | WITH t1 AS (
-//         | SELECT * FROM source
-//         | UNION ALL
-//         | SELECT * FROM source
-//         | )
-//         | SELECT t1.*, t2.*
-//         | FROM t1, source t2
-//         | WHERE t1.stringField = t2.stringField
-//      """.stripMargin).show()
-//
-//    spark.sql(
-//      s"""
-//         | SELECT *
-//         | FROM source
-//         | WHERE stringField = 'spark' and floatField > 2.8
-//       """.stripMargin).show()
-//
-//    // Drop table
-//    spark.sql("DROP TABLE IF EXISTS source")
-//    spark.sql("create table p using parquet options('path' = '/home/root1/samplecsvfiles/parquetdata')")
-//    spark.sql("select * from p").show()
-//    spark.sql("create table c using carbon as select * from p")
-//    spark.sql("desc formatted c").show(truncate = false)
+    spark.sql("DROP TABLE IF EXISTS source")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE source(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED AS carbondata
+       """.stripMargin)
+
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE source
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+       """.stripMargin)
+    // scalastyle:on
+
+    spark.sql(
+      s"""
+         | SELECT charField, stringField, intField
+         | FROM source
+         | WHERE stringfield = 'spark' AND decimalField > 40
+      """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM source WHERE length(stringField) = 5
+       """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+       """.stripMargin).show()
+
+    spark.sql("SELECT count(stringField) FROM source").show()
+
+    spark.sql(
+      s"""
+         | SELECT sum(intField), stringField
+         | FROM source
+         | GROUP BY stringField
+       """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | SELECT t1.*, t2.*
+         | FROM source t1, source t2
+         | WHERE t1.stringField = t2.stringField
+      """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | WITH t1 AS (
+         | SELECT * FROM source
+         | UNION ALL
+         | SELECT * FROM source
+         | )
+         | SELECT t1.*, t2.*
+         | FROM t1, source t2
+         | WHERE t1.stringField = t2.stringField
+      """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM source
+         | WHERE stringField = 'spark' and floatField > 2.8
+       """.stripMargin).show()
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS source")
   }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 48774f7..d81b02c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -162,13 +162,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         // do block filtering and get split
         splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
       } else {
-        List<CarbonFile> carbonFiles = null;
-        if (null != this.fileLists) {
-          carbonFiles = getAllCarbonDataFiles(this.fileLists);
-        } else {
-          carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
-        }
-        for (CarbonFile carbonFile : carbonFiles) {
+        for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
           // Segment id is set to null because SDK does not write carbondata files with respect
           // to segments. So no specific name is present for this load.
           CarbonInputSplit split =
@@ -214,18 +208,6 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     return carbonFiles;
   }
 
-  private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
-    List<CarbonFile> carbonFiles = new LinkedList<>();
-    try {
-      for (int i = 0; i < fileLists.size(); i++) {
-        carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return carbonFiles;
-  }
-
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 0bcfea6..90532fb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -806,7 +806,4 @@ m filterExpression
     getQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
   }
 
-  public void setFileLists(List fileLists) {
-    this.fileLists = fileLists;
-  }
 }
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 7023736..a1d594d 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -54,11 +54,31 @@
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-memory</artifactId>
       <version>0.12.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-vector</artifactId>
       <version>0.12.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
@@ -69,6 +89,12 @@
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-flight</artifactId>
       <version>0.12.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
@@ -90,11 +116,6 @@
       <artifactId>jackson-databind</artifactId>
       <version>${dep.jackson.version}</version>
     </dependency>
-    <dependency>
-      <groupId>net.sf.py4j</groupId>
-      <artifactId>py4j</artifactId>
-      <version>0.10.8.1</version>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
new file mode 100644
index 0000000..a53ad6b
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * Reader for CarbonData file which fills the arrow vector
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class ArrowCarbonReader<T> extends CarbonReader<T> {
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+  ArrowCarbonReader(List<RecordReader<Void, T>> readers) {
+    super(readers);
+  }
+
+  /**
+   * Carbon reader will fill the arrow vector after reading the carbondata files.
+   * This arrow byte[] can be used to create arrow table and used for in memory analytics
+   * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+   *
+   * @param carbonSchema
+   * @return
+   * @throws Exception
+   */
+  public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
+    ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
+    while (hasNext()) {
+      arrowConverter.addToArrowBuffer(readNextBatchRow());
+    }
+    return arrowConverter.toSerializeArray();
+  }
+
+  /**
+   * Carbon reader will fill the arrow vector after reading the carbondata files.
+   * This arrow byte[] can be used to create arrow table and used for in memory analytics
+   * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+   *
+   * @param carbonSchema
+   * @return
+   * @throws Exception
+   */
+  public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception {
+    ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
+    while (hasNext()) {
+      arrowConverter.addToArrowBuffer(readNextBatchRow());
+    }
+    return arrowConverter.getArrowVectors();
+  }
+
+  /**
+   * Carbon reader will fill the arrow vector after reading carbondata files.
+   * Here unsafe memory address will be returned instead of byte[],
+   * so that this address can be sent across java to python or c modules and
+   * can directly read the content from this unsafe memory
+   * Note:Create a carbon reader at blocklet level using CarbonReader.buildWithSplits(split) method,
+   * so that arrow byte[] will not exceed INT_MAX.
+   *
+   * @param carbonSchema
+   * @return
+   * @throws Exception
+   */
+  public long readArrowBatchAddress(Schema carbonSchema) throws Exception {
+    ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 0);
+    while (hasNext()) {
+      arrowConverter.addToArrowBuffer(readNextBatchRow());
+    }
+    return arrowConverter.copySerializeArrayToOffHeap();
+  }
+
+  /**
+   * free the unsafe memory allocated , if unsafe arrow batch is used.
+   *
+   * @param address
+   */
+  public void freeArrowBatchMemory(long address) {
+    CarbonUnsafe.getUnsafe().freeMemory(address);
+  }
+}
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index d7f08d4..e5c0680 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
-import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
 
 import org.apache.hadoop.mapreduce.RecordReader;
 
@@ -95,15 +94,6 @@ public class CarbonReader<T> {
     return currentReader.getCurrentValue();
   }
 
-  public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
-    ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 10000);
-    while (hasNext()) {
-      arrowConverter.addToArrowBuffer(readNextBatchRow());
-    }
-    final byte[] bytes = arrowConverter.toSerializeArray();
-    arrowConverter.close();
-    return bytes;
-  }
   /**
    * Read and return next batch row objects
    */
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index f1855ee..ad9a383 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
@@ -58,6 +57,7 @@ public class CarbonReaderBuilder {
   private String tableName;
   private Configuration hadoopConf;
   private boolean useVectorReader = true;
+  private boolean useArrowReader;
 
   /**
    * Construct a CarbonReaderBuilder with table path and table name
@@ -147,54 +147,19 @@ public class CarbonReaderBuilder {
     return this;
   }
 
-  public String[] getSplits() throws IOException {
-    if (hadoopConf == null) {
-      hadoopConf = FileFactory.getConfiguration();
-    }
-    CarbonTable table;
-    // now always infer schema. TODO:Refactor in next version.
-    table = CarbonTable.buildTable(tablePath, tableName, hadoopConf, false);
-    final CarbonFileInputFormat format = new CarbonFileInputFormat();
-    final Job job = new Job(hadoopConf);
-    format.setTableInfo(job.getConfiguration(), table.getTableInfo());
-    format.setTablePath(job.getConfiguration(), table.getTablePath());
-    format.setTableName(job.getConfiguration(), table.getTableName());
-    format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
-    if (filterExpression != null) {
-      format.setFilterPredicates(job.getConfiguration(), filterExpression);
-    }
-
-    if (projectionColumns != null) {
-      // set the user projection
-      int len = projectionColumns.length;
-      //      TODO : Handle projection of complex child columns
-      for (int i = 0; i < len; i++) {
-        if (projectionColumns[i].contains(".")) {
-          throw new UnsupportedOperationException(
-              "Complex child columns projection NOT supported through CarbonReader");
-        }
-      }
-      format.setColumnProjection(job.getConfiguration(), projectionColumns);
-    }
-
-    List<String> files = new ArrayList<>();
-    try {
-
-      if (filterExpression == null) {
-        job.getConfiguration().set("filter_blocks", "false");
-      }
-      List<InputSplit> splits =
-          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
-      for (InputSplit split : splits) {
-        files.add(((CarbonInputSplit) split).getPath().toUri().getPath());
-      }
-    } catch (Exception ex) {
-      // Clear the datamap cache as it can get added in getSplits() method
-      DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier());
-      throw ex;
-    }
-    return files.toArray(new String[files.size()]);
+  /**
+   * build Arrow carbon reader
+   *
+   * @param <T>
+   * @return ArrowCarbonReader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public <T> ArrowCarbonReader<T> buildArrowReader() throws IOException, InterruptedException {
+    useArrowReader = true;
+    return (ArrowCarbonReader<T>) this.build();
   }
+
   /**
    * Build CarbonReader
    *
@@ -268,7 +233,11 @@ public class CarbonReaderBuilder {
           throw e;
         }
       }
-      return new CarbonReader<>(readers);
+      if (useArrowReader) {
+        return new ArrowCarbonReader<>(readers);
+      } else {
+        return new CarbonReader<>(readers);
+      }
     } catch (Exception ex) {
       // Clear the datamap cache as it can get added in getSplits() method
       DataMapStoreManager.getInstance()
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index cde609b..88cbc33 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
 
 import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 import static org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT;
@@ -116,6 +117,21 @@ public class CarbonSchemaReader {
   }
 
   /**
+   * Converting carbon schema to arrow schema in byte[],
+   * byte[] can be converted back arrowSchema by other arrow interface module like pyspark etc.
+   *
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public static byte[] getArrowSchemaAsBytes(String path) throws IOException {
+    Schema schema = CarbonSchemaReader.readSchema(path).asOriginOrder();
+    ArrowConverter arrowConverter = new ArrowConverter(schema, 0);
+    final byte[] bytes = arrowConverter.toSerializeArray();
+    return bytes;
+  }
+
+  /**
    * read schema from path,
    * path can be folder path, carbonindex file path, and carbondata file path
    * and will not check all files schema
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index f4a8ba8..54735fb 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.sdk.file.arrow;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.util.TimeZone;
@@ -25,8 +24,12 @@ import java.util.TimeZone;
 import org.apache.carbondata.sdk.file.Schema;
 
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
 import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
 
 public class ArrowConverter {
 
@@ -34,19 +37,25 @@ public class ArrowConverter {
   private VectorSchemaRoot root;
   private ArrowWriter arrowWriter;
   private org.apache.arrow.vector.types.pojo.Schema arrowSchema;
-  private ByteArrayOutputStream out;
+  private ExtendedByteArrayOutputStream out;
   private ArrowFileWriter writer;
 
-  public ArrowConverter(Schema schema, int initalSize) {
+  public ArrowConverter(Schema schema, int initialSize) {
     this.arrowSchema = ArrowUtils.toArrowSchema(schema, TimeZone.getDefault().getID());
     this.allocator =
-        ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", initalSize, Long.MAX_VALUE);
+        ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", initialSize, Long.MAX_VALUE);
     this.root = VectorSchemaRoot.create(arrowSchema, allocator);
     this.arrowWriter = ArrowWriter.create(root);
-    this.out = new ByteArrayOutputStream();
+    // currently blocklet level read and set initial value to 32 MB.
+    this.out = new ExtendedByteArrayOutputStream(32 * 1024 * 1024);
     this.writer = new ArrowFileWriter(root, null, Channels.newChannel(out));
   }
 
+  /**
+   * write batch of row objects to Arrow vectors
+   *
+   * @param data
+   */
   public void addToArrowBuffer(Object[] data) {
     int i = 0;
     while (i < data.length) {
@@ -55,6 +64,12 @@ public class ArrowConverter {
     }
   }
 
+  /**
+   * To serialize arrow vectors to byte[]
+   *
+   * @return
+   * @throws IOException
+   */
   public byte[] toSerializeArray() throws IOException {
     arrowWriter.finish();
     writer.writeBatch();
@@ -65,9 +80,56 @@ public class ArrowConverter {
     return out.toByteArray();
   }
 
-  public void close() {
-    //    this.root.close();
-    //    this.arrowWriter.finish();
-    //    this.allocator.close();
+  /**
+   * To copy arrow vectors to unsafe memory
+   *
+   * @return
+   * @throws IOException
+   */
+  public long copySerializeArrayToOffHeap() throws IOException {
+    arrowWriter.finish();
+    writer.writeBatch();
+    this.writer.close();
+    arrowWriter.reset();
+    writer.close();
+    this.root.close();
+    return out.copyToAddress();
+  }
+
+  /**
+   * Utility API to convert back the arrow byte[] to arrow VectorSchemaRoot.
+   *
+   * @param batchBytes
+   * @return
+   * @throws IOException
+   */
+  public VectorSchemaRoot byteArrayToVector(byte[] batchBytes) throws IOException {
+    ByteArrayReadableSeekableByteChannel in = new ByteArrayReadableSeekableByteChannel(batchBytes);
+    ArrowFileReader reader = new ArrowFileReader(in, allocator);
+    try {
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      VectorUnloader unloader = new VectorUnloader(root);
+      reader.loadNextBatch();
+      VectorSchemaRoot arrowRoot = VectorSchemaRoot.create(arrowSchema, allocator);
+      VectorLoader vectorLoader = new VectorLoader(arrowRoot);
+      vectorLoader.load(unloader.getRecordBatch());
+      return arrowRoot;
+    } catch (IOException e) {
+      reader.close();
+      throw e;
+    }
+  }
+
+  /**
+   * To get the arrow vectors directly after filling from carbondata
+   *
+   * @return
+   */
+  public VectorSchemaRoot getArrowVectors() throws IOException {
+    arrowWriter.finish();
+    writer.writeBatch();
+    this.writer.close();
+    writer.close();
+    return root;
   }
 }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java
index 38c878f..daeae67 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java
@@ -18,14 +18,19 @@ package org.apache.carbondata.sdk.file.arrow;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -186,6 +191,40 @@ class DoubleWriter extends ArrowFieldWriter {
   }
 }
 
+class DateWriter extends ArrowFieldWriter {
+  private DateDayVector dateDayVector;
+
+  public DateWriter(DateDayVector dateDayVector) {
+    super(dateDayVector);
+    this.dateDayVector = dateDayVector;
+  }
+
+  @Override public void setNull() {
+    this.dateDayVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.dateDayVector.setSafe(count, (int)data);
+  }
+}
+
+class TimeStampWriter extends ArrowFieldWriter {
+  private TimeStampMicroTZVector timeStampMicroTZVector;
+
+  public TimeStampWriter(TimeStampMicroTZVector timeStampMicroTZVector) {
+    super(timeStampMicroTZVector);
+    this.timeStampMicroTZVector = timeStampMicroTZVector;
+  }
+
+  @Override public void setNull() {
+    this.timeStampMicroTZVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.timeStampMicroTZVector.setSafe(count, (long)data);
+  }
+}
+
 class StringWriter extends ArrowFieldWriter {
   private VarCharVector varCharVector;
 
@@ -199,9 +238,9 @@ class StringWriter extends ArrowFieldWriter {
   }
 
   @Override public void setValue(Object data, int ordinal) {
-    //TODO check if it works with JAVA String object intead of utf8 String
-    byte[] bytes = ((String) data).getBytes();
-    ByteBuffer byteBuffer = ByteBuffer.wrap(((String) data).getBytes());
+    byte[] bytes =
+        (String.valueOf(data)).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     this.varCharVector.setSafe(count, byteBuffer, byteBuffer.position(), bytes.length);
   }
 }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java
index 1374204..13d0c5c 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java
@@ -37,29 +37,29 @@ import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.FieldType;
 
-//TODO check with ravi
 public class ArrowUtils {
 
-  public static RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+  public static final RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
 
   public static ArrowType toArrowType(DataType carbonDataType, String timeZoneId) {
-    if (carbonDataType == DataTypes.STRING) {
+    if (carbonDataType == DataTypes.STRING || carbonDataType == DataTypes.VARCHAR) {
       return ArrowType.Utf8.INSTANCE;
     } else if (carbonDataType == DataTypes.BYTE) {
-      return new ArrowType.Int(8, true);
+      return new ArrowType.Int(DataTypes.BYTE.getSizeInBytes() * 8, true);
     } else if (carbonDataType == DataTypes.SHORT) {
-      return new ArrowType.Int(8 * 2, true);
+      return new ArrowType.Int(DataTypes.SHORT.getSizeInBytes() * 8, true);
     } else if (carbonDataType == DataTypes.INT) {
-      return new ArrowType.Int(8 * 4, true);
+      return new ArrowType.Int(DataTypes.INT.getSizeInBytes() * 8, true);
     } else if (carbonDataType == DataTypes.LONG) {
-      return new ArrowType.Int(8 * 8, true);
+      return new ArrowType.Int(DataTypes.LONG.getSizeInBytes() * 8, true);
     } else if (carbonDataType == DataTypes.FLOAT) {
       return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
     } else if (carbonDataType == DataTypes.DOUBLE) {
       return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
     } else if (carbonDataType == DataTypes.BOOLEAN) {
       return ArrowType.Bool.INSTANCE;
-    } else if (DataTypes.isDecimal(carbonDataType)) {
+    } else if (carbonDataType instanceof DecimalType) {
+      // instance of check is for findbugs, instead of datatypes check
       DecimalType decimal = (DecimalType) carbonDataType;
       return new ArrowType.Decimal(decimal.getPrecision(), decimal.getScale());
     } else if (carbonDataType == DataTypes.TIMESTAMP) {
@@ -75,15 +75,16 @@ public class ArrowUtils {
 
   public static org.apache.arrow.vector.types.pojo.Field toArrowField(String name,
       DataType dataType, String timeZoneId) {
-    if (DataTypes.isArrayType(dataType)) {
+    if (dataType instanceof ArrayType) {
+      // instance of check is for findbugs, instead of datatypes check
       FieldType fieldType = new FieldType(true, ArrowType.List.INSTANCE, null);
       List<org.apache.arrow.vector.types.pojo.Field> structFields = new ArrayList<>();
-      structFields
-          .add(toArrowField("element", ((ArrayType) dataType).getElementType(), timeZoneId));
+      DataType elementType = ((ArrayType) dataType).getElementType();
+      structFields.add(toArrowField("element", elementType, timeZoneId));
       return new org.apache.arrow.vector.types.pojo.Field(name, fieldType, structFields);
-      // TODO check with RAVI
-    } else if (DataTypes.isStructType(dataType)) {
-      final StructType dataType1 = (StructType) dataType;
+    } else if (dataType instanceof StructType) {
+      // instance of check is for findbugs, instead of datatypes check
+      StructType dataType1 = (StructType) dataType;
       FieldType fieldType = new FieldType(true, ArrowType.Struct.INSTANCE, null);
       List<StructField> fields = dataType1.getFields();
       List<org.apache.arrow.vector.types.pojo.Field> structFields = new ArrayList<>();
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java
index f0645dd..9fdee97 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java
@@ -23,12 +23,14 @@ import org.apache.carbondata.sdk.file.Schema;
 
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -130,6 +132,10 @@ public class ArrowWriter {
           arrowFieldWriters.toArray(new ArrowFieldWriter[arrowFieldWriters.size()]));
     } else if (valueVector instanceof VarBinaryVector) {
       return new BinaryWriter((VarBinaryVector) valueVector);
+    } else if (valueVector instanceof DateDayVector) {
+      return new DateWriter((DateDayVector) valueVector);
+    } else if (valueVector instanceof TimeStampMicroTZVector) {
+      return new TimeStampWriter((TimeStampMicroTZVector) valueVector);
     } else {
       throw new UnsupportedOperationException("Invalid data type");
     }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
new file mode 100644
index 0000000..393cd86
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ExtendedByteArrayOutputStream.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.sdk.file.arrow;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+
+public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
+
+  public ExtendedByteArrayOutputStream(int initialSize) {
+    super(initialSize);
+  }
+
+  public long copyToAddress() {
+    final long address = CarbonUnsafe.getUnsafe()
+        .allocateMemory(CarbonCommonConstants.INT_SIZE_IN_BYTE + count);
+    CarbonUnsafe.getUnsafe().putInt(address, count);
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(buf, CarbonUnsafe.BYTE_ARRAY_OFFSET, null,
+            address + CarbonCommonConstants.INT_SIZE_IN_BYTE, count);
+    return address;
+  }
+}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 6a3578c..1607b8f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -22,6 +22,10 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.*;
 
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.avro.generic.GenericData;
 import org.apache.log4j.Logger;
 
@@ -30,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -38,6 +43,7 @@ import org.apache.carbondata.core.scan.expression.conditional.*;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
 
 import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
@@ -2418,4 +2424,133 @@ public class CarbonReaderTest extends TestCase {
       }
     }
   }
+
+  @Test
+  public void testArrowReader() {
+    String path = "./carbondata";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[13];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+      fields[11] = new Field("floatField", DataTypes.FLOAT);
+      fields[12] = new Field("binaryField", DataTypes.BINARY);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+      byte[] value = "Binary".getBytes();
+      for (int i = 0; i < 10; i++) {
+        Object[] row2 = new Object[]{
+            "robot" + (i % 10),
+            i % 10000,
+            i,
+            (Long.MAX_VALUE - i),
+            ((double) i / 2),
+            (true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            12.345,
+            "varchar",
+            "Hello#World#From#Carbon",
+            1.23,
+            value
+        };
+        writer.write(row2);
+      }
+      writer.close();
+      // Read data
+      ArrowCarbonReader reader =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      Schema carbonSchema = CarbonSchemaReader.readSchema(path);
+      byte[] data = reader.readArrowBatch(carbonSchema);
+      ArrowConverter arrowConverter = new ArrowConverter(carbonSchema,0);
+      VectorSchemaRoot vectorSchemaRoot = arrowConverter.byteArrayToVector(data);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot.getRowCount(), 10);
+      List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
+      }
+      reader.close();
+
+      // Read data with address (unsafe memory)
+      ArrowCarbonReader reader1 =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      long address = reader1.readArrowBatchAddress(carbonSchema);
+      int length = CarbonUnsafe.getUnsafe().getInt(address);
+      byte[] data1 = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      ArrowConverter arrowConverter1 = new ArrowConverter(carbonSchema,0);
+      VectorSchemaRoot vectorSchemaRoot1 = arrowConverter1.byteArrayToVector(data1);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot1.getRowCount(), 10);
+      List<FieldVector> fieldVectors1 = vectorSchemaRoot1.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
+      }
+      // free the unsafe memory
+      reader1.freeArrowBatchMemory(address);
+      reader1.close();
+
+
+      // Read as arrow vector
+      ArrowCarbonReader reader2 =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      VectorSchemaRoot vectorSchemaRoot2 = reader2.readArrowVectors(carbonSchema);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot2.getRowCount(), 10);
+      List<FieldVector> fieldVectors2 = vectorSchemaRoot2.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors2.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
+      }
+      reader2.close();
+
+      // Read arrowSchema
+      byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
+      ArrowConverter arrowConverter3 = new ArrowConverter(carbonSchema, 0);
+      VectorSchemaRoot vectorSchemaRoot3 = arrowConverter3.byteArrayToVector(schema);
+      assertEquals(vectorSchemaRoot3.getSchema().getFields().size(), 13);
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
 }