You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2019/06/21 11:27:13 UTC

[carbondata] branch master updated: [CARBONDATA-3365] SDK Arrow integration document update

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 93339ac  [CARBONDATA-3365] SDK Arrow integration document update
93339ac is described below

commit 93339ac998a918488499dc9bbeafec4977564072
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Jun 18 18:31:05 2019 +0530

    [CARBONDATA-3365] SDK Arrow integration document update
    
    SDK Arrow integration document update
    
    This closes #3293
---
 docs/sdk-guide.md                                  |  80 ++++++++
 .../carbondata/sdk/file/ArrowCarbonReader.java     |  14 +-
 .../carbondata/sdk/file/arrow/ArrowConverter.java  |   6 +-
 .../carbondata/sdk/file/ArrowCarbonReaderTest.java | 219 +++++++++++++++++++++
 .../carbondata/sdk/file/CarbonReaderTest.java      | 167 ----------------
 5 files changed, 309 insertions(+), 177 deletions(-)

diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 53b066b..b8e9f51 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -614,6 +614,10 @@ reader.close();
 
 Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java) in the CarbonData repo.
 
+SDK reader also supports reading carbondata files and filling it to apache arrow vectors.
+Find example code at [ArrowCarbonReaderTest](https://github.com/apache/carbondata/blob/master/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java) in the CarbonData repo.
+
+
 ## API List
 
 ### Class org.apache.carbondata.sdk.file.CarbonReader
@@ -687,6 +691,82 @@ public Object[] readNextBatchRow();
 public void close();
 ```
 
+### Class org.apache.carbondata.sdk.file.ArrowCarbonReader
+```
+/**
+ * Carbon reader will fill the arrow vector after reading the carbondata files.
+ * This arrow byte[] can be used to create arrow table and used for in memory analytics
+ * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return Serialized byte array
+ * @throws Exception
+ */
+public byte[] readArrowBatch(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Carbon reader will fill the arrow vector after reading the carbondata files.
+ * This arrow byte[] can be used to create arrow table and used for in memory analytics
+ * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+ * User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema 
+ * @return Arrow VectorSchemaRoot
+ * @throws Exception
+ */
+public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Carbon reader will fill the arrow vector after reading carbondata files.
+ * Here unsafe memory address will be returned instead of byte[],
+ * so that this address can be sent across java to python or c modules and
+ * can directly read the content from this unsafe memory
+ * Note:Create a carbon reader at blocklet level using CarbonReader.buildWithSplits(split) method,
+ * so that arrow byte[] will not exceed INT_MAX.
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return address of the unsafe memory where arrow buffer is stored
+ * @throws Exception
+ */
+public long readArrowBatchAddress(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Free the unsafe memory allocated , if unsafe arrow batch is used.
+ *
+ * @param address address of the unsafe memory where arrow bufferer is stored
+ */
+public void freeArrowBatchMemory(long address)
+```
+
+### Class org.apache.carbondata.sdk.file.arrow.ArrowConverter
+```
+/**
+ * To get the arrow vectors directly after filling from carbondata
+ *
+ * @return Arrow VectorSchemaRoot. which contains array of arrow vectors.
+ */
+public VectorSchemaRoot getArrowVectors() throws IOException;
+```
+
+```
+/**
+ * Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
+ * User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
+ *
+ * @param batchBytes input byte array
+ * @param bufferAllocator arrow buffer allocator
+ * @return ArrowRecordBatch
+ * @throws IOException
+ */
+public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes, BufferAllocator bufferAllocator) throws IOException;
+```
+
 ### Class org.apache.carbondata.sdk.file.CarbonReaderBuilder
 ```
 /**
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
index f30e557..69fe3ee 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
@@ -46,8 +46,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
    * This arrow byte[] can be used to create arrow table and used for in memory analytics
    * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
    *
-   * @param carbonSchema
-   * @return
+   * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+   * @return Serialized byte array
    * @throws Exception
    */
   public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
@@ -64,8 +64,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
    * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
    * User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
    *
-   * @param carbonSchema
-   * @return
+   * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+   * @return Arrow VectorSchemaRoot
    * @throws Exception
    */
   public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception {
@@ -84,8 +84,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
    * Note:Create a carbon reader at blocklet level using CarbonReader.buildWithSplits(split) method,
    * so that arrow byte[] will not exceed INT_MAX.
    *
-   * @param carbonSchema
-   * @return
+   * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+   * @return address of the unsafe memory where arrow buffer is stored
    * @throws Exception
    */
   public long readArrowBatchAddress(Schema carbonSchema) throws Exception {
@@ -99,7 +99,7 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
   /**
    * free the unsafe memory allocated , if unsafe arrow batch is used.
    *
-   * @param address
+   * @param address address of the unsafe memory where arrow buffer is stored
    */
   public void freeArrowBatchMemory(long address) {
     CarbonUnsafe.getUnsafe().freeMemory(address);
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index 0c08eab..268d06b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -105,8 +105,8 @@ public class ArrowConverter {
    * Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
    * User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
    *
-   * @param batchBytes
-   * @param bufferAllocator
+   * @param batchBytes input byte array
+   * @param bufferAllocator arrow buffer allocator
    * @return ArrowRecordBatch
    * @throws IOException
    */
@@ -128,7 +128,7 @@ public class ArrowConverter {
   /**
    * To get the arrow vectors directly after filling from carbondata
    *
-   * @return
+   * @return Arrow VectorSchemaRoot. which contains array of arrow vectors.
    */
   public VectorSchemaRoot getArrowVectors() throws IOException {
     arrowWriter.finish();
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java
new file mode 100644
index 0000000..69ef53b
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
+import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
+
+import junit.framework.TestCase;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrowCarbonReaderTest extends TestCase {
+
+  @Before
+  public void cleanFile() {
+    assert (TestUtil.cleanMdtFile());
+  }
+
+  @After
+  public void verifyDMFile() {
+    assert (!TestUtil.verifyMdtFile());
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testArrowReader() {
+    String path = "./carbondata";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[13];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+      fields[11] = new Field("floatField", DataTypes.FLOAT);
+      fields[12] = new Field("binaryField", DataTypes.BINARY);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+      byte[] value = "Binary".getBytes();
+      for (int i = 0; i < 10; i++) {
+        Object[] row2 = new Object[]{
+            "robot" + (i % 10),
+            i % 10000,
+            i,
+            (Long.MAX_VALUE - i),
+            ((double) i / 2),
+            (true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            12.345,
+            "varchar",
+            "Hello#World#From#Carbon",
+            1.23,
+            value
+        };
+        writer.write(row2);
+      }
+      writer.close();
+      // Read data
+      ArrowCarbonReader reader =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      Schema carbonSchema = CarbonSchemaReader.readSchema(path);
+      byte[] data = reader.readArrowBatch(carbonSchema);
+      BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      ArrowRecordBatch arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
+      VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot.getRowCount(), 10);
+      List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
+      }
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
+      reader.close();
+
+      // Read data with address (unsafe memory)
+      ArrowCarbonReader reader1 =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      long address = reader1.readArrowBatchAddress(carbonSchema);
+      int length = CarbonUnsafe.getUnsafe().getInt(address);
+      byte[] data1 = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
+      vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot.getRowCount(), 10);
+      List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
+      }
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
+      // free the unsafe memory
+      reader1.freeArrowBatchMemory(address);
+      reader1.close();
+
+
+      // Read as arrow vector
+      ArrowCarbonReader reader2 =
+          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+      VectorSchemaRoot vectorSchemaRoot2 = reader2.readArrowVectors(carbonSchema);
+      // check for 10 rows
+      assertEquals(vectorSchemaRoot2.getRowCount(), 10);
+      List<FieldVector> fieldVectors2 = vectorSchemaRoot2.getFieldVectors();
+      // validate short column
+      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+        assertEquals(((SmallIntVector)fieldVectors2.get(6)).get(i), i);
+      }
+      // validate float column
+      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+        assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
+      }
+      vectorSchemaRoot.close();
+      reader2.close();
+
+      // Read arrowSchema
+      byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
+      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
+      vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
+      assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 1aa0f40..9aee152 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -22,13 +22,6 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.*;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.VectorLoader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.log4j.Logger;
@@ -38,7 +31,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -47,8 +39,6 @@ import org.apache.carbondata.core.scan.expression.conditional.*;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
-import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
 
 import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
@@ -2431,163 +2421,6 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
-  public void testArrowReader() {
-    String path = "./carbondata";
-    try {
-      FileUtils.deleteDirectory(new File(path));
-
-      Field[] fields = new Field[13];
-      fields[0] = new Field("stringField", DataTypes.STRING);
-      fields[1] = new Field("shortField", DataTypes.SHORT);
-      fields[2] = new Field("intField", DataTypes.INT);
-      fields[3] = new Field("longField", DataTypes.LONG);
-      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
-      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
-      fields[6] = new Field("dateField", DataTypes.DATE);
-      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
-      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
-      fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
-      fields[11] = new Field("floatField", DataTypes.FLOAT);
-      fields[12] = new Field("binaryField", DataTypes.BINARY);
-      Map<String, String> map = new HashMap<>();
-      map.put("complex_delimiter_level_1", "#");
-      CarbonWriter writer = CarbonWriter.builder()
-          .outputPath(path)
-          .withLoadOptions(map)
-          .withCsvInput(new Schema(fields))
-          .writtenBy("CarbonReaderTest")
-          .build();
-      byte[] value = "Binary".getBytes();
-      for (int i = 0; i < 10; i++) {
-        Object[] row2 = new Object[]{
-            "robot" + (i % 10),
-            i % 10000,
-            i,
-            (Long.MAX_VALUE - i),
-            ((double) i / 2),
-            (true),
-            "2019-03-02",
-            "2019-02-12 03:03:34",
-            12.345,
-            "varchar",
-            "Hello#World#From#Carbon",
-            1.23,
-            value
-        };
-        writer.write(row2);
-      }
-      writer.close();
-      // Read data
-      ArrowCarbonReader reader =
-          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
-      Schema carbonSchema = CarbonSchemaReader.readSchema(path);
-      byte[] data = reader.readArrowBatch(carbonSchema);
-      BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
-      ArrowRecordBatch arrowRecordBatch =
-          ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
-      VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
-          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
-              bufferAllocator);
-      VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
-      vectorLoader.load(arrowRecordBatch);
-      // check for 10 rows
-      assertEquals(vectorSchemaRoot.getRowCount(), 10);
-      List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
-      // validate short column
-      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
-        assertEquals(((SmallIntVector)fieldVectors.get(6)).get(i), i);
-      }
-      // validate float column
-      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
-        assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
-      }
-      arrowRecordBatch.close();
-      vectorSchemaRoot.close();
-      bufferAllocator.close();
-      reader.close();
-
-      // Read data with address (unsafe memory)
-      ArrowCarbonReader reader1 =
-          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
-      long address = reader1.readArrowBatchAddress(carbonSchema);
-      int length = CarbonUnsafe.getUnsafe().getInt(address);
-      byte[] data1 = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
-      arrowRecordBatch =
-          ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
-      vectorSchemaRoot = VectorSchemaRoot
-          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
-              bufferAllocator);
-      vectorLoader = new VectorLoader(vectorSchemaRoot);
-      vectorLoader.load(arrowRecordBatch);
-      // check for 10 rows
-      assertEquals(vectorSchemaRoot.getRowCount(), 10);
-      List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
-      // validate short column
-      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
-        assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
-      }
-      // validate float column
-      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
-        assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
-      }
-      arrowRecordBatch.close();
-      vectorSchemaRoot.close();
-      bufferAllocator.close();
-      // free the unsafe memory
-      reader1.freeArrowBatchMemory(address);
-      reader1.close();
-
-
-      // Read as arrow vector
-      ArrowCarbonReader reader2 =
-          CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
-      VectorSchemaRoot vectorSchemaRoot2 = reader2.readArrowVectors(carbonSchema);
-      // check for 10 rows
-      assertEquals(vectorSchemaRoot2.getRowCount(), 10);
-      List<FieldVector> fieldVectors2 = vectorSchemaRoot2.getFieldVectors();
-      // validate short column
-      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
-        assertEquals(((SmallIntVector)fieldVectors2.get(6)).get(i), i);
-      }
-      // validate float column
-      for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
-        assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
-      }
-      vectorSchemaRoot.close();
-      reader2.close();
-
-      // Read arrowSchema
-      byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
-      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
-      arrowRecordBatch =
-          ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
-      vectorSchemaRoot = VectorSchemaRoot
-          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
-              bufferAllocator);
-      vectorLoader = new VectorLoader(vectorSchemaRoot);
-      vectorLoader.load(arrowRecordBatch);
-      assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
-      arrowRecordBatch.close();
-      vectorSchemaRoot.close();
-      bufferAllocator.close();
-    } catch (Throwable e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } finally {
-      try {
-        FileUtils.deleteDirectory(new File(path));
-      } catch (IOException e) {
-        e.printStackTrace();
-        Assert.fail(e.getMessage());
-      }
-    }
-  }
-
-
-  @Test
   public void testReadBlocklet() throws IOException, InterruptedException {
     String path = "./testWriteFiles/" + System.nanoTime();
     FileUtils.deleteDirectory(new File(path));