You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 15:39:59 UTC

[carbondata] branch master updated: [CARBONDATA-3413] Fix io.netty out of direct memory exception in arrow integration

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c1c50cf  [CARBONDATA-3413] Fix io.netty out of direct memory exception in arrow integration
c1c50cf is described below

commit c1c50cf4b8ec31490889a8dbb69dac4c8b59c293
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Jun 4 19:00:22 2019 +0530

    [CARBONDATA-3413] Fix io.netty out of direct memory exception in arrow integration
    
    problem : io.netty out of direct memory exception in arrow integration
    
    cause: In ArrowConverter, allocator is not closed
    
    solution: close the allocator in arrowConverter.
    Also handle the problems in test utility API
    
    This closes #3256
---
 .../carbondata/sdk/file/ArrowCarbonReader.java     |  1 +
 .../carbondata/sdk/file/arrow/ArrowConverter.java  | 43 +++++++++--------
 .../carbondata/sdk/file/CarbonReaderTest.java      | 54 +++++++++++++++++-----
 3 files changed, 67 insertions(+), 31 deletions(-)

diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
index a53ad6b..f30e557 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
@@ -62,6 +62,7 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
    * Carbon reader will fill the arrow vector after reading the carbondata files.
    * This arrow byte[] can be used to create arrow table and used for in memory analytics
    * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+   * User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
    *
    * @param carbonSchema
    * @return
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index 54735fb..c0e4e27 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -24,11 +24,11 @@ import java.util.TimeZone;
 import org.apache.carbondata.sdk.file.Schema;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.ipc.ArrowFileReader;
 import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
 
 public class ArrowConverter {
@@ -73,11 +73,13 @@ public class ArrowConverter {
   public byte[] toSerializeArray() throws IOException {
     arrowWriter.finish();
     writer.writeBatch();
-    this.writer.close();
-    arrowWriter.reset();
     writer.close();
-    this.root.close();
-    return out.toByteArray();
+    arrowWriter.reset();
+    root.close();
+    byte[] bytes = out.toByteArray();
+    allocator.close();
+    out.close();
+    return bytes;
   }
 
   /**
@@ -89,34 +91,36 @@ public class ArrowConverter {
   public long copySerializeArrayToOffHeap() throws IOException {
     arrowWriter.finish();
     writer.writeBatch();
-    this.writer.close();
-    arrowWriter.reset();
     writer.close();
-    this.root.close();
-    return out.copyToAddress();
+    arrowWriter.reset();
+    root.close();
+    long address = out.copyToAddress();
+    allocator.close();
+    out.close();
+    return address;
   }
 
   /**
-   * Utility API to convert back the arrow byte[] to arrow VectorSchemaRoot.
+   * Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
+   * User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
    *
    * @param batchBytes
-   * @return
+   * @param bufferAllocator
+   * @return ArrowRecordBatch
    * @throws IOException
    */
-  public VectorSchemaRoot byteArrayToVector(byte[] batchBytes) throws IOException {
+  public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes,
+      BufferAllocator bufferAllocator)
+      throws IOException {
     ByteArrayReadableSeekableByteChannel in = new ByteArrayReadableSeekableByteChannel(batchBytes);
-    ArrowFileReader reader = new ArrowFileReader(in, allocator);
+    ArrowFileReader reader = new ArrowFileReader(in, bufferAllocator);
     try {
       VectorSchemaRoot root = reader.getVectorSchemaRoot();
       VectorUnloader unloader = new VectorUnloader(root);
       reader.loadNextBatch();
-      VectorSchemaRoot arrowRoot = VectorSchemaRoot.create(arrowSchema, allocator);
-      VectorLoader vectorLoader = new VectorLoader(arrowRoot);
-      vectorLoader.load(unloader.getRecordBatch());
-      return arrowRoot;
-    } catch (IOException e) {
+      return unloader.getRecordBatch();
+    } finally {
       reader.close();
-      throw e;
     }
   }
 
@@ -128,7 +132,6 @@ public class ArrowConverter {
   public VectorSchemaRoot getArrowVectors() throws IOException {
     arrowWriter.finish();
     writer.writeBatch();
-    this.writer.close();
     writer.close();
     return root;
   }
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 24a735e..1aa0f40 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -22,10 +22,13 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.*;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.log4j.Logger;
@@ -45,6 +48,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
+import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
 
 import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
@@ -2479,8 +2483,14 @@ public class CarbonReaderTest extends TestCase {
           CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
       Schema carbonSchema = CarbonSchemaReader.readSchema(path);
       byte[] data = reader.readArrowBatch(carbonSchema);
-      ArrowConverter arrowConverter = new ArrowConverter(carbonSchema,0);
-      VectorSchemaRoot vectorSchemaRoot = arrowConverter.byteArrayToVector(data);
+      BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      ArrowRecordBatch arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
+      VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
       // check for 10 rows
       assertEquals(vectorSchemaRoot.getRowCount(), 10);
       List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
@@ -2492,6 +2502,9 @@ public class CarbonReaderTest extends TestCase {
       for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
         assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
       }
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
       reader.close();
 
       // Read data with address (unsafe memory)
@@ -2501,19 +2514,28 @@ public class CarbonReaderTest extends TestCase {
       int length = CarbonUnsafe.getUnsafe().getInt(address);
       byte[] data1 = new byte[length];
       CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      ArrowConverter arrowConverter1 = new ArrowConverter(carbonSchema,0);
-      VectorSchemaRoot vectorSchemaRoot1 = arrowConverter1.byteArrayToVector(data1);
+      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
+      vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
       // check for 10 rows
-      assertEquals(vectorSchemaRoot1.getRowCount(), 10);
-      List<FieldVector> fieldVectors1 = vectorSchemaRoot1.getFieldVectors();
+      assertEquals(vectorSchemaRoot.getRowCount(), 10);
+      List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
       // validate short column
-      for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
         assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
       }
       // validate float column
-      for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+      for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
         assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
       }
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
       // free the unsafe memory
       reader1.freeArrowBatchMemory(address);
       reader1.close();
@@ -2534,13 +2556,23 @@ public class CarbonReaderTest extends TestCase {
       for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
         assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
       }
+      vectorSchemaRoot.close();
       reader2.close();
 
       // Read arrowSchema
       byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
-      ArrowConverter arrowConverter3 = new ArrowConverter(carbonSchema, 0);
-      VectorSchemaRoot vectorSchemaRoot3 = arrowConverter3.byteArrayToVector(schema);
-      assertEquals(vectorSchemaRoot3.getSchema().getFields().size(), 13);
+      bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+      arrowRecordBatch =
+          ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
+      vectorSchemaRoot = VectorSchemaRoot
+          .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+              bufferAllocator);
+      vectorLoader = new VectorLoader(vectorSchemaRoot);
+      vectorLoader.load(arrowRecordBatch);
+      assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
+      arrowRecordBatch.close();
+      vectorSchemaRoot.close();
+      bufferAllocator.close();
     } catch (Throwable e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());