You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 15:39:59 UTC
[carbondata] branch master updated: [CARBONDATA-3413] Fix io.netty
out of direct memory exception in arrow integration
This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c1c50cf [CARBONDATA-3413] Fix io.netty out of direct memory exception in arrow integration
c1c50cf is described below
commit c1c50cf4b8ec31490889a8dbb69dac4c8b59c293
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Jun 4 19:00:22 2019 +0530
[CARBONDATA-3413] Fix io.netty out of direct memory exception in arrow integration
problem : io.netty out of direct memory exception in arrow integration
cause: In ArrowConverter, allocator is not closed
solution: close the allocator in arrowConverter.
Also handle the problems in test utility API
This closes #3256
---
.../carbondata/sdk/file/ArrowCarbonReader.java | 1 +
.../carbondata/sdk/file/arrow/ArrowConverter.java | 43 +++++++++--------
.../carbondata/sdk/file/CarbonReaderTest.java | 54 +++++++++++++++++-----
3 files changed, 67 insertions(+), 31 deletions(-)
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
index a53ad6b..f30e557 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
@@ -62,6 +62,7 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
* Carbon reader will fill the arrow vector after reading the carbondata files.
* This arrow byte[] can be used to create arrow table and used for in memory analytics
* Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+ * User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
*
* @param carbonSchema
* @return
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index 54735fb..c0e4e27 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -24,11 +24,11 @@ import java.util.TimeZone;
import org.apache.carbondata.sdk.file.Schema;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
public class ArrowConverter {
@@ -73,11 +73,13 @@ public class ArrowConverter {
public byte[] toSerializeArray() throws IOException {
arrowWriter.finish();
writer.writeBatch();
- this.writer.close();
- arrowWriter.reset();
writer.close();
- this.root.close();
- return out.toByteArray();
+ arrowWriter.reset();
+ root.close();
+ byte[] bytes = out.toByteArray();
+ allocator.close();
+ out.close();
+ return bytes;
}
/**
@@ -89,34 +91,36 @@ public class ArrowConverter {
public long copySerializeArrayToOffHeap() throws IOException {
arrowWriter.finish();
writer.writeBatch();
- this.writer.close();
- arrowWriter.reset();
writer.close();
- this.root.close();
- return out.copyToAddress();
+ arrowWriter.reset();
+ root.close();
+ long address = out.copyToAddress();
+ allocator.close();
+ out.close();
+ return address;
}
/**
- * Utility API to convert back the arrow byte[] to arrow VectorSchemaRoot.
+ * Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
+ * User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
*
* @param batchBytes
- * @return
+ * @param bufferAllocator
+ * @return ArrowRecordBatch
* @throws IOException
*/
- public VectorSchemaRoot byteArrayToVector(byte[] batchBytes) throws IOException {
+ public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes,
+ BufferAllocator bufferAllocator)
+ throws IOException {
ByteArrayReadableSeekableByteChannel in = new ByteArrayReadableSeekableByteChannel(batchBytes);
- ArrowFileReader reader = new ArrowFileReader(in, allocator);
+ ArrowFileReader reader = new ArrowFileReader(in, bufferAllocator);
try {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
VectorUnloader unloader = new VectorUnloader(root);
reader.loadNextBatch();
- VectorSchemaRoot arrowRoot = VectorSchemaRoot.create(arrowSchema, allocator);
- VectorLoader vectorLoader = new VectorLoader(arrowRoot);
- vectorLoader.load(unloader.getRecordBatch());
- return arrowRoot;
- } catch (IOException e) {
+ return unloader.getRecordBatch();
+ } finally {
reader.close();
- throw e;
}
}
@@ -128,7 +132,6 @@ public class ArrowConverter {
public VectorSchemaRoot getArrowVectors() throws IOException {
arrowWriter.finish();
writer.writeBatch();
- this.writer.close();
writer.close();
return root;
}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 24a735e..1aa0f40 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -22,10 +22,13 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.util.*;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;
@@ -45,6 +48,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
+import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
@@ -2479,8 +2483,14 @@ public class CarbonReaderTest extends TestCase {
CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
Schema carbonSchema = CarbonSchemaReader.readSchema(path);
byte[] data = reader.readArrowBatch(carbonSchema);
- ArrowConverter arrowConverter = new ArrowConverter(carbonSchema,0);
- VectorSchemaRoot vectorSchemaRoot = arrowConverter.byteArrayToVector(data);
+ BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ ArrowRecordBatch arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
+ VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
// check for 10 rows
assertEquals(vectorSchemaRoot.getRowCount(), 10);
List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
@@ -2492,6 +2502,9 @@ public class CarbonReaderTest extends TestCase {
for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
}
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
reader.close();
// Read data with address (unsafe memory)
@@ -2501,19 +2514,28 @@ public class CarbonReaderTest extends TestCase {
int length = CarbonUnsafe.getUnsafe().getInt(address);
byte[] data1 = new byte[length];
CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- ArrowConverter arrowConverter1 = new ArrowConverter(carbonSchema,0);
- VectorSchemaRoot vectorSchemaRoot1 = arrowConverter1.byteArrayToVector(data1);
+ bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
+ vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
// check for 10 rows
- assertEquals(vectorSchemaRoot1.getRowCount(), 10);
- List<FieldVector> fieldVectors1 = vectorSchemaRoot1.getFieldVectors();
+ assertEquals(vectorSchemaRoot.getRowCount(), 10);
+ List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
// validate short column
- for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
}
// validate float column
- for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
}
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
// free the unsafe memory
reader1.freeArrowBatchMemory(address);
reader1.close();
@@ -2534,13 +2556,23 @@ public class CarbonReaderTest extends TestCase {
for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
}
+ vectorSchemaRoot.close();
reader2.close();
// Read arrowSchema
byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
- ArrowConverter arrowConverter3 = new ArrowConverter(carbonSchema, 0);
- VectorSchemaRoot vectorSchemaRoot3 = arrowConverter3.byteArrayToVector(schema);
- assertEquals(vectorSchemaRoot3.getSchema().getFields().size(), 13);
+ bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
+ vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
+ assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());