You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2019/06/21 11:27:13 UTC
[carbondata] branch master updated: [CARBONDATA-3365] SDK Arrow
integration document update
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 93339ac [CARBONDATA-3365] SDK Arrow integration document update
93339ac is described below
commit 93339ac998a918488499dc9bbeafec4977564072
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Tue Jun 18 18:31:05 2019 +0530
[CARBONDATA-3365] SDK Arrow integration document update
SDK Arrow integration document update
This closes #3293
---
docs/sdk-guide.md | 80 ++++++++
.../carbondata/sdk/file/ArrowCarbonReader.java | 14 +-
.../carbondata/sdk/file/arrow/ArrowConverter.java | 6 +-
.../carbondata/sdk/file/ArrowCarbonReaderTest.java | 219 +++++++++++++++++++++
.../carbondata/sdk/file/CarbonReaderTest.java | 167 ----------------
5 files changed, 309 insertions(+), 177 deletions(-)
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 53b066b..b8e9f51 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -614,6 +614,10 @@ reader.close();
Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java) in the CarbonData repo.
+SDK reader also supports reading carbondata files and filling it to apache arrow vectors.
+Find example code at [ArrowCarbonReaderTest](https://github.com/apache/carbondata/blob/master/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java) in the CarbonData repo.
+
+
## API List
### Class org.apache.carbondata.sdk.file.CarbonReader
@@ -687,6 +691,82 @@ public Object[] readNextBatchRow();
public void close();
```
+### Class org.apache.carbondata.sdk.file.ArrowCarbonReader
+```
+/**
+ * Carbon reader will fill the arrow vector after reading the carbondata files.
+ * This arrow byte[] can be used to create arrow table and used for in memory analytics
+ * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return Serialized byte array
+ * @throws Exception
+ */
+public byte[] readArrowBatch(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Carbon reader will fill the arrow vector after reading the carbondata files.
+ * This arrow byte[] can be used to create arrow table and used for in memory analytics
+ * Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
+ * User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return Arrow VectorSchemaRoot
+ * @throws Exception
+ */
+public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Carbon reader will fill the arrow vector after reading carbondata files.
+ * Here unsafe memory address will be returned instead of byte[],
+ * so that this address can be sent across java to python or c modules and
+ * can directly read the content from this unsafe memory
+ * Note:Create a carbon reader at blocklet level using CarbonReader.buildWithSplits(split) method,
+ * so that arrow byte[] will not exceed INT_MAX.
+ *
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return address of the unsafe memory where arrow buffer is stored
+ * @throws Exception
+ */
+public long readArrowBatchAddress(Schema carbonSchema) throws Exception;
+```
+
+```
+/**
+ * Free the unsafe memory allocated , if unsafe arrow batch is used.
+ *
+ * @param address address of the unsafe memory where arrow bufferer is stored
+ */
+public void freeArrowBatchMemory(long address)
+```
+
+### Class org.apache.carbondata.sdk.file.arrow.ArrowConverter
+```
+/**
+ * To get the arrow vectors directly after filling from carbondata
+ *
+ * @return Arrow VectorSchemaRoot. which contains array of arrow vectors.
+ */
+public VectorSchemaRoot getArrowVectors() throws IOException;
+```
+
+```
+/**
+ * Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
+ * User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
+ *
+ * @param batchBytes input byte array
+ * @param bufferAllocator arrow buffer allocator
+ * @return ArrowRecordBatch
+ * @throws IOException
+ */
+public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes, BufferAllocator bufferAllocator) throws IOException;
+```
+
### Class org.apache.carbondata.sdk.file.CarbonReaderBuilder
```
/**
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
index f30e557..69fe3ee 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/ArrowCarbonReader.java
@@ -46,8 +46,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
* This arrow byte[] can be used to create arrow table and used for in memory analytics
* Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
*
- * @param carbonSchema
- * @return
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return Serialized byte array
* @throws Exception
*/
public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
@@ -64,8 +64,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
* Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
* User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
*
- * @param carbonSchema
- * @return
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return Arrow VectorSchemaRoot
* @throws Exception
*/
public VectorSchemaRoot readArrowVectors(Schema carbonSchema) throws Exception {
@@ -84,8 +84,8 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
* Note:Create a carbon reader at blocklet level using CarbonReader.buildWithSplits(split) method,
* so that arrow byte[] will not exceed INT_MAX.
*
- * @param carbonSchema
- * @return
+ * @param carbonSchema org.apache.carbondata.sdk.file.Schema
+ * @return address of the unsafe memory where arrow buffer is stored
* @throws Exception
*/
public long readArrowBatchAddress(Schema carbonSchema) throws Exception {
@@ -99,7 +99,7 @@ public class ArrowCarbonReader<T> extends CarbonReader<T> {
/**
* free the unsafe memory allocated , if unsafe arrow batch is used.
*
- * @param address
+ * @param address address of the unsafe memory where arrow buffer is stored
*/
public void freeArrowBatchMemory(long address) {
CarbonUnsafe.getUnsafe().freeMemory(address);
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
index 0c08eab..268d06b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -105,8 +105,8 @@ public class ArrowConverter {
* Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
* User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
*
- * @param batchBytes
- * @param bufferAllocator
+ * @param batchBytes input byte array
+ * @param bufferAllocator arrow buffer allocator
* @return ArrowRecordBatch
* @throws IOException
*/
@@ -128,7 +128,7 @@ public class ArrowConverter {
/**
* To get the arrow vectors directly after filling from carbondata
*
- * @return
+ * @return Arrow VectorSchemaRoot. which contains array of arrow vectors.
*/
public VectorSchemaRoot getArrowVectors() throws IOException {
arrowWriter.finish();
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java
new file mode 100644
index 0000000..69ef53b
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ArrowCarbonReaderTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
+import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
+
+import junit.framework.TestCase;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrowCarbonReaderTest extends TestCase {
+
+ @Before
+ public void cleanFile() {
+ assert (TestUtil.cleanMdtFile());
+ }
+
+ @After
+ public void verifyDMFile() {
+ assert (!TestUtil.verifyMdtFile());
+ String path = "./testWriteFiles";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testArrowReader() {
+ String path = "./carbondata";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[13];
+ fields[0] = new Field("stringField", DataTypes.STRING);
+ fields[1] = new Field("shortField", DataTypes.SHORT);
+ fields[2] = new Field("intField", DataTypes.INT);
+ fields[3] = new Field("longField", DataTypes.LONG);
+ fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+ fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+ fields[6] = new Field("dateField", DataTypes.DATE);
+ fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+ fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+ fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+ fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+ fields[11] = new Field("floatField", DataTypes.FLOAT);
+ fields[12] = new Field("binaryField", DataTypes.BINARY);
+ Map<String, String> map = new HashMap<>();
+ map.put("complex_delimiter_level_1", "#");
+ CarbonWriter writer = CarbonWriter.builder()
+ .outputPath(path)
+ .withLoadOptions(map)
+ .withCsvInput(new Schema(fields))
+ .writtenBy("CarbonReaderTest")
+ .build();
+ byte[] value = "Binary".getBytes();
+ for (int i = 0; i < 10; i++) {
+ Object[] row2 = new Object[]{
+ "robot" + (i % 10),
+ i % 10000,
+ i,
+ (Long.MAX_VALUE - i),
+ ((double) i / 2),
+ (true),
+ "2019-03-02",
+ "2019-02-12 03:03:34",
+ 12.345,
+ "varchar",
+ "Hello#World#From#Carbon",
+ 1.23,
+ value
+ };
+ writer.write(row2);
+ }
+ writer.close();
+ // Read data
+ ArrowCarbonReader reader =
+ CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+ Schema carbonSchema = CarbonSchemaReader.readSchema(path);
+ byte[] data = reader.readArrowBatch(carbonSchema);
+ BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ ArrowRecordBatch arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
+ VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
+ // check for 10 rows
+ assertEquals(vectorSchemaRoot.getRowCount(), 10);
+ List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
+ // validate short column
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ assertEquals(((SmallIntVector)fieldVectors.get(6)).get(i), i);
+ }
+ // validate float column
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
+ }
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
+ reader.close();
+
+ // Read data with address (unsafe memory)
+ ArrowCarbonReader reader1 =
+ CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+ long address = reader1.readArrowBatchAddress(carbonSchema);
+ int length = CarbonUnsafe.getUnsafe().getInt(address);
+ byte[] data1 = new byte[length];
+ CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+ bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
+ vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
+ // check for 10 rows
+ assertEquals(vectorSchemaRoot.getRowCount(), 10);
+ List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
+ // validate short column
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
+ }
+ // validate float column
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
+ }
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
+ // free the unsafe memory
+ reader1.freeArrowBatchMemory(address);
+ reader1.close();
+
+
+ // Read as arrow vector
+ ArrowCarbonReader reader2 =
+ CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
+ VectorSchemaRoot vectorSchemaRoot2 = reader2.readArrowVectors(carbonSchema);
+ // check for 10 rows
+ assertEquals(vectorSchemaRoot2.getRowCount(), 10);
+ List<FieldVector> fieldVectors2 = vectorSchemaRoot2.getFieldVectors();
+ // validate short column
+ for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+ assertEquals(((SmallIntVector)fieldVectors2.get(6)).get(i), i);
+ }
+ // validate float column
+ for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
+ assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
+ }
+ vectorSchemaRoot.close();
+ reader2.close();
+
+ // Read arrowSchema
+ byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
+ bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
+ arrowRecordBatch =
+ ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
+ vectorSchemaRoot = VectorSchemaRoot
+ .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
+ bufferAllocator);
+ vectorLoader = new VectorLoader(vectorSchemaRoot);
+ vectorLoader.load(arrowRecordBatch);
+ assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
+ arrowRecordBatch.close();
+ vectorSchemaRoot.close();
+ bufferAllocator.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 1aa0f40..9aee152 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -22,13 +22,6 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.util.*;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.VectorLoader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;
@@ -38,7 +31,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -47,8 +39,6 @@ import org.apache.carbondata.core.scan.expression.conditional.*;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
-import org.apache.carbondata.sdk.file.arrow.ArrowUtils;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
@@ -2431,163 +2421,6 @@ public class CarbonReaderTest extends TestCase {
}
@Test
- public void testArrowReader() {
- String path = "./carbondata";
- try {
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[13];
- fields[0] = new Field("stringField", DataTypes.STRING);
- fields[1] = new Field("shortField", DataTypes.SHORT);
- fields[2] = new Field("intField", DataTypes.INT);
- fields[3] = new Field("longField", DataTypes.LONG);
- fields[4] = new Field("doubleField", DataTypes.DOUBLE);
- fields[5] = new Field("boolField", DataTypes.BOOLEAN);
- fields[6] = new Field("dateField", DataTypes.DATE);
- fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
- fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
- fields[9] = new Field("varcharField", DataTypes.VARCHAR);
- fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
- fields[11] = new Field("floatField", DataTypes.FLOAT);
- fields[12] = new Field("binaryField", DataTypes.BINARY);
- Map<String, String> map = new HashMap<>();
- map.put("complex_delimiter_level_1", "#");
- CarbonWriter writer = CarbonWriter.builder()
- .outputPath(path)
- .withLoadOptions(map)
- .withCsvInput(new Schema(fields))
- .writtenBy("CarbonReaderTest")
- .build();
- byte[] value = "Binary".getBytes();
- for (int i = 0; i < 10; i++) {
- Object[] row2 = new Object[]{
- "robot" + (i % 10),
- i % 10000,
- i,
- (Long.MAX_VALUE - i),
- ((double) i / 2),
- (true),
- "2019-03-02",
- "2019-02-12 03:03:34",
- 12.345,
- "varchar",
- "Hello#World#From#Carbon",
- 1.23,
- value
- };
- writer.write(row2);
- }
- writer.close();
- // Read data
- ArrowCarbonReader reader =
- CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
- Schema carbonSchema = CarbonSchemaReader.readSchema(path);
- byte[] data = reader.readArrowBatch(carbonSchema);
- BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
- ArrowRecordBatch arrowRecordBatch =
- ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
- VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
- .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
- bufferAllocator);
- VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
- vectorLoader.load(arrowRecordBatch);
- // check for 10 rows
- assertEquals(vectorSchemaRoot.getRowCount(), 10);
- List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
- // validate short column
- for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
- assertEquals(((SmallIntVector)fieldVectors.get(6)).get(i), i);
- }
- // validate float column
- for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
- assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
- }
- arrowRecordBatch.close();
- vectorSchemaRoot.close();
- bufferAllocator.close();
- reader.close();
-
- // Read data with address (unsafe memory)
- ArrowCarbonReader reader1 =
- CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
- long address = reader1.readArrowBatchAddress(carbonSchema);
- int length = CarbonUnsafe.getUnsafe().getInt(address);
- byte[] data1 = new byte[length];
- CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
- arrowRecordBatch =
- ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
- vectorSchemaRoot = VectorSchemaRoot
- .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
- bufferAllocator);
- vectorLoader = new VectorLoader(vectorSchemaRoot);
- vectorLoader.load(arrowRecordBatch);
- // check for 10 rows
- assertEquals(vectorSchemaRoot.getRowCount(), 10);
- List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
- // validate short column
- for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
- assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
- }
- // validate float column
- for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
- assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
- }
- arrowRecordBatch.close();
- vectorSchemaRoot.close();
- bufferAllocator.close();
- // free the unsafe memory
- reader1.freeArrowBatchMemory(address);
- reader1.close();
-
-
- // Read as arrow vector
- ArrowCarbonReader reader2 =
- CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
- VectorSchemaRoot vectorSchemaRoot2 = reader2.readArrowVectors(carbonSchema);
- // check for 10 rows
- assertEquals(vectorSchemaRoot2.getRowCount(), 10);
- List<FieldVector> fieldVectors2 = vectorSchemaRoot2.getFieldVectors();
- // validate short column
- for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
- assertEquals(((SmallIntVector)fieldVectors2.get(6)).get(i), i);
- }
- // validate float column
- for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
- assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
- }
- vectorSchemaRoot.close();
- reader2.close();
-
- // Read arrowSchema
- byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
- bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
- arrowRecordBatch =
- ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
- vectorSchemaRoot = VectorSchemaRoot
- .create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
- bufferAllocator);
- vectorLoader = new VectorLoader(vectorSchemaRoot);
- vectorLoader.load(arrowRecordBatch);
- assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
- arrowRecordBatch.close();
- vectorSchemaRoot.close();
- bufferAllocator.close();
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- } finally {
- try {
- FileUtils.deleteDirectory(new File(path));
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
- }
-
-
- @Test
public void testReadBlocklet() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));