You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/11/08 18:29:43 UTC
arrow git commit: ARROW-312: [Java] IPC file round trip tool for
integration testing
Repository: arrow
Updated Branches:
refs/heads/master 79344b335 -> 6996c17f7
ARROW-312: [Java] IPC file round trip tool for integration testing
Author: Julien Le Dem <ju...@dremio.com>
Author: Wes McKinney <we...@twosigma.com>
Closes #186 from wesm/roundtrip-tool and squashes the following commits:
aee552a [Julien Le Dem] missing file
9d5c078 [Julien Le Dem] fix read-write bug
7f20b36 [Julien Le Dem] simple roundtrip
a04091f [Wes McKinney] Drafting file round trip helper executable
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6996c17f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6996c17f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6996c17f
Branch: refs/heads/master
Commit: 6996c17f70dc13659c37dfaa39bc28e7777ca6a6
Parents: 79344b3
Author: Julien Le Dem <ju...@dremio.com>
Authored: Tue Nov 8 13:29:34 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Nov 8 13:29:34 2016 -0500
----------------------------------------------------------------------
.../src/main/java/io/netty/buffer/ArrowBuf.java | 7 +-
.../apache/arrow/memory/TestBaseAllocator.java | 24 ++-
java/pom.xml | 1 +
java/tools/pom.xml | 73 +++++++++
.../org/apache/arrow/tools/FileRoundtrip.java | 135 ++++++++++++++++
.../apache/arrow/tools/TestFileRoundtrip.java | 159 +++++++++++++++++++
java/vector/pom.xml | 32 ++--
.../codegen/templates/NullableValueVectors.java | 2 +-
.../org/apache/arrow/vector/VectorLoader.java | 21 ++-
.../apache/arrow/vector/VectorSchemaRoot.java | 140 ++++++++++++++++
.../org/apache/arrow/vector/VectorUnloader.java | 13 +-
.../apache/arrow/vector/schema/ArrowBuffer.java | 6 +
.../arrow/vector/schema/ArrowRecordBatch.java | 8 +
.../arrow/vector/TestVectorUnloadLoad.java | 42 +++--
.../apache/arrow/vector/file/TestArrowFile.java | 149 +++++++++--------
15 files changed, 681 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
index a5989c1..95d2be5 100644
--- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -179,7 +179,10 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
historicalLog.recordEvent("retain(%s)", target.getName());
}
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
- return otherLedger.newArrowBuf(offset, length, null);
+ ArrowBuf newArrowBuf = otherLedger.newArrowBuf(offset, length, null);
+ newArrowBuf.readerIndex(this.readerIndex);
+ newArrowBuf.writerIndex(this.writerIndex);
+ return newArrowBuf;
}
/**
@@ -214,6 +217,8 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
final ArrowBuf newBuf = otherLedger.newArrowBuf(offset, length, null);
+ newBuf.readerIndex(this.readerIndex);
+ newBuf.writerIndex(this.writerIndex);
final boolean allocationFit = this.ledger.transferBalance(otherLedger);
return new TransferResult(allocationFit, newBuf);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
----------------------------------------------------------------------
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index aa6b70c..3c96d57 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -22,16 +22,13 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import io.netty.buffer.ArrowBuf;
-import io.netty.buffer.ArrowBuf.TransferResult;
-import org.apache.arrow.memory.AllocationReservation;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.memory.RootAllocator;
import org.junit.Ignore;
import org.junit.Test;
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.ArrowBuf.TransferResult;
+
public class TestBaseAllocator {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
@@ -134,6 +131,7 @@ public class TestBaseAllocator {
final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
rootAllocator.verify();
TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2);
+ assertEquiv(arrowBuf1, transferOwnership.buffer);
final boolean allocationFit = transferOwnership.allocationFit;
rootAllocator.verify();
assertTrue(allocationFit);
@@ -160,6 +158,7 @@ public class TestBaseAllocator {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
// release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
arrowBuf1.release();
@@ -172,6 +171,7 @@ public class TestBaseAllocator {
assertNotNull(arrowBuf3);
assertNotEquals(arrowBuf3, arrowBuf1);
assertNotEquals(arrowBuf3, arrowBuf2);
+ assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();
arrowBuf2.release();
@@ -452,8 +452,10 @@ public class TestBaseAllocator {
rootAllocator.verify();
TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1);
+ assertEquiv(arrowBuf2s, result1.buffer);
rootAllocator.verify();
TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2);
+ assertEquiv(arrowBuf1s, result2.buffer);
rootAllocator.verify();
result1.buffer.release();
@@ -482,7 +484,9 @@ public class TestBaseAllocator {
rootAllocator.verify();
final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1);
+ assertEquiv(arrowBuf2s, arrowBuf2s1);
final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2);
+ assertEquiv(arrowBuf1s, arrowBuf1s2);
rootAllocator.verify();
arrowBuf1s.release(); // releases arrowBuf1
@@ -512,11 +516,13 @@ public class TestBaseAllocator {
rootAllocator.verify();
assertNotNull(arrowBuf2);
assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
TransferResult result = arrowBuf1.transferOwnership(childAllocator3);
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf3 = result.buffer;
assertTrue(allocationFit);
+ assertEquiv(arrowBuf1, arrowBuf3);
rootAllocator.verify();
// Since childAllocator3 now has childAllocator1's buffer, 1, can close
@@ -533,6 +539,7 @@ public class TestBaseAllocator {
allocationFit = result.allocationFit;
final ArrowBuf arrowBuf4 = result2.buffer;
assertTrue(allocationFit);
+ assertEquiv(arrowBuf3, arrowBuf4);
rootAllocator.verify();
arrowBuf3.release();
@@ -645,4 +652,9 @@ public class TestBaseAllocator {
}
}
+
+ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
+ assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
+ assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
+ }
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 0147de7..7221a14 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -467,5 +467,6 @@
<module>format</module>
<module>memory</module>
<module>vector</module>
+ <module>tools</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
new file mode 100644
index 0000000..84b0b5e
--- /dev/null
+++ b/java/tools/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>0.1.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>arrow-tools</artifactId>
+ <name>Arrow Tools</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
new file mode 100644
index 0000000..db7a1c2
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowFooter;
+import org.apache.arrow.vector.file.ArrowReader;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileRoundtrip {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);
+
+ public static void main(String[] args) {
+ System.exit(new FileRoundtrip(System.out, System.err).run(args));
+ }
+
+ private final Options options;
+ private final PrintStream out;
+ private final PrintStream err;
+
+ FileRoundtrip(PrintStream out, PrintStream err) {
+ this.out = out;
+ this.err = err;
+ this.options = new Options();
+ this.options.addOption("i", "in", true, "input file");
+ this.options.addOption("o", "out", true, "output file");
+
+ }
+
+ private File validateFile(String type, String fileName) {
+ if (fileName == null) {
+ throw new IllegalArgumentException("missing " + type + " file parameter");
+ }
+ File f = new File(fileName);
+ if (!f.exists() || f.isDirectory()) {
+ throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
+ }
+ return f;
+ }
+
+ int run(String[] args) {
+ try {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args, false);
+
+ String inFileName = cmd.getOptionValue("in");
+ String outFileName = cmd.getOptionValue("out");
+
+ File inFile = validateFile("input", inFileName);
+ File outFile = validateFile("output", outFileName);
+ BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
+ try(
+ FileInputStream fileInputStream = new FileInputStream(inFile);
+ ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
+
+ ArrowFooter footer = arrowReader.readFooter();
+ Schema schema = footer.getSchema();
+ LOGGER.debug("Input file size: " + inFile.length());
+ LOGGER.debug("Found schema: " + schema);
+
+ try (
+ FileOutputStream fileOutputStream = new FileOutputStream(outFile);
+ ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
+ ) {
+
+ // initialize vectors
+
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
+ VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
+
+ VectorLoader vectorLoader = new VectorLoader(root);
+ vectorLoader.load(inRecordBatch);
+
+ VectorUnloader vectorUnloader = new VectorUnloader(root);
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ }
+ }
+ LOGGER.debug("Output file size: " + outFile.length());
+ }
+ } catch (ParseException e) {
+ return fatalError("Invalid parameters", e);
+ } catch (IOException e) {
+ return fatalError("Error accessing files", e);
+ }
+ return 0;
+ }
+
+ private int fatalError(String message, Throwable e) {
+ err.println(message);
+ LOGGER.error(message, e);
+ return 1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java
new file mode 100644
index 0000000..339725e
--- /dev/null
+++ b/java/tools/src/test/java/org/apache/arrow/tools/TestFileRoundtrip.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.arrow.tools;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowFooter;
+import org.apache.arrow.vector.file.ArrowReader;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFileRoundtrip {
+ private static final int COUNT = 10;
+
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ private BufferAllocator allocator;
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ private void writeData(int count, MapVector parent) {
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ MapWriter rootWriter = writer.rootAsMap();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ for (int i = 0; i < count; i++) {
+ intWriter.setPosition(i);
+ intWriter.writeInt(i);
+ bigIntWriter.setPosition(i);
+ bigIntWriter.writeBigInt(i);
+ }
+ writer.setValueCount(count);
+ }
+
+ @Test
+ public void test() throws Exception {
+ File testInFile = testFolder.newFile("testIn.arrow");
+ File testOutFile = testFolder.newFile("testOut.arrow");
+
+ writeInput(testInFile);
+
+ String[] args = { "-i", testInFile.getAbsolutePath(), "-o", testOutFile.getAbsolutePath()};
+ int result = new FileRoundtrip(System.out, System.err).run(args);
+ assertEquals(0, result);
+
+ validateOutput(testOutFile);
+ }
+
+ private void validateOutput(File testOutFile) throws Exception {
+ // read
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(testOutFile);
+ ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ ) {
+ ArrowFooter footer = arrowReader.readFooter();
+ Schema schema = footer.getSchema();
+
+ // initialize vectors
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ vectorLoader.load(recordBatch);
+ }
+ validateContent(COUNT, root);
+ }
+ }
+ }
+ }
+
+ private void validateContent(int count, VectorSchemaRoot root) {
+ Assert.assertEquals(count, root.getRowCount());
+ for (int i = 0; i < count; i++) {
+ Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
+ Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
+ }
+ }
+
+ public void writeInput(File testInFile) throws FileNotFoundException, IOException {
+ int count = COUNT;
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)) {
+ writeData(count, parent);
+ write(parent.getChild("root"), testInFile);
+ }
+ }
+
+ private void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
+ Schema schema = new Schema(parent.getField().getChildren());
+ int valueCount = parent.getAccessor().getValueCount();
+ List<FieldVector> fields = parent.getChildrenFromFields();
+ VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
+ try (
+ FileOutputStream fileOutputStream = new FileOutputStream(file);
+ ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ ) {
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/pom.xml
----------------------------------------------------------------------
diff --git a/java/vector/pom.xml b/java/vector/pom.xml
index 1d06bde..64b68bf 100644
--- a/java/vector/pom.xml
+++ b/java/vector/pom.xml
@@ -1,13 +1,13 @@
<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -56,8 +56,6 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
-
-
</dependencies>
<pluginRepositories>
@@ -72,13 +70,13 @@
<enabled>false</enabled>
</snapshots>
</pluginRepository>
- </pluginRepositories>
-
+ </pluginRepositories>
+
<build>
<resources>
<resource>
- <!-- Copy freemarker template and fmpp configuration files of Vector's
+ <!-- Copy freemarker template and fmpp configuration files of Vector's
to allow clients to leverage definitions. -->
<directory>${basedir}/src/main/codegen</directory>
<targetPath>codegen</targetPath>
@@ -129,7 +127,7 @@
</plugins>
<pluginManagement>
<plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings
+ <!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
@@ -160,8 +158,8 @@
</plugin>
</plugins>
</pluginManagement>
-
-
+
+
</build>
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index bafa317..48af7a2 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -145,7 +145,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
- // TODO: do something with the sizes in fieldNode?
+ bits.valueCount = fieldNode.getLength();
}
public List<ArrowBuf> getFieldBuffers() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index b7040da..4afd823 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -27,7 +27,6 @@ import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.schema.VectorLayout;
import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.collect.Iterators;
@@ -37,22 +36,16 @@ import io.netty.buffer.ArrowBuf;
* Loads buffers into vectors
*/
public class VectorLoader {
- private final List<FieldVector> fieldVectors;
- private final List<Field> fields;
+ private final VectorSchemaRoot root;
/**
* will create children in root based on schema
* @param schema the expected schema
* @param root the root to add vectors to based on schema
*/
- public VectorLoader(Schema schema, FieldVector root) {
+ public VectorLoader(VectorSchemaRoot root) {
super();
- this.fields = schema.getFields();
- root.initializeChildrenFromFields(fields);
- this.fieldVectors = root.getChildrenFromFields();
- if (this.fieldVectors.size() != fields.size()) {
- throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + fields.size());
- }
+ this.root = root;
}
/**
@@ -63,16 +56,19 @@ public class VectorLoader {
public void load(ArrowRecordBatch recordBatch) {
Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
+ List<Field> fields = root.getSchema().getFields();
for (int i = 0; i < fields.size(); ++i) {
Field field = fields.get(i);
- FieldVector fieldVector = fieldVectors.get(i);
+ FieldVector fieldVector = root.getVector(field.getName());
loadBuffers(fieldVector, field, buffers, nodes);
}
+ root.setRowCount(recordBatch.getLength());
if (nodes.hasNext() || buffers.hasNext()) {
throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
}
}
+
private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
checkArgument(nodes.hasNext(),
"no more field nodes for for field " + field + " and vector " + vector);
@@ -85,7 +81,7 @@ public class VectorLoader {
try {
vector.loadFieldBuffers(fieldNode, ownBuffers);
} catch (RuntimeException e) {
- throw new IllegalArgumentException("Could not load buffers for field " + field);
+ throw new IllegalArgumentException("Could not load buffers for field " + field, e);
}
List<Field> children = field.getChildren();
if (children.size() > 0) {
@@ -98,4 +94,5 @@ public class VectorLoader {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
new file mode 100644
index 0000000..1cbe187
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class VectorSchemaRoot implements AutoCloseable {
+
+ private final Schema schema;
+ private int rowCount;
+ private final List<FieldVector> fieldVectors;
+ private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
+
+ public VectorSchemaRoot(FieldVector parent) {
+ this.schema = new Schema(parent.getField().getChildren());
+ this.rowCount = parent.getAccessor().getValueCount();
+ this.fieldVectors = parent.getChildrenFromFields();
+ for (int i = 0; i < schema.getFields().size(); ++i) {
+ Field field = schema.getFields().get(i);
+ FieldVector vector = fieldVectors.get(i);
+ fieldVectorsMap.put(field.getName(), vector);
+ }
+ }
+
+ public VectorSchemaRoot(Schema schema, BufferAllocator allocator) {
+ super();
+ this.schema = schema;
+ List<FieldVector> fieldVectors = new ArrayList<>();
+ for (Field field : schema.getFields()) {
+ MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
+ FieldVector vector = minorType.getNewVector(field.getName(), allocator, null);
+ vector.initializeChildrenFromFields(field.getChildren());
+ fieldVectors.add(vector);
+ fieldVectorsMap.put(field.getName(), vector);
+ }
+ this.fieldVectors = Collections.unmodifiableList(fieldVectors);
+ if (this.fieldVectors.size() != schema.getFields().size()) {
+ throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size());
+ }
+ }
+
+ public List<FieldVector> getFieldVectors() {
+ return fieldVectors;
+ }
+
+ public FieldVector getVector(String name) {
+ return fieldVectorsMap.get(name);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ for (FieldVector fieldVector : fieldVectors) {
+ try {
+ fieldVector.close();
+ } catch (RuntimeException e) {
+ ex = chain(ex, e);
+ }
+ }
+ if (ex!= null) {
+ throw ex;
+ }
+ }
+
+ private RuntimeException chain(RuntimeException root, RuntimeException e) {
+ if (root == null) {
+ root = e;
+ } else {
+ root.addSuppressed(e);
+ }
+ return root;
+ }
+
+ private void printRow(StringBuilder sb, List<Object> row) {
+ boolean first = true;
+ for (Object v : row) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append("\t");
+ }
+ sb.append(v);
+ }
+ sb.append("\n");
+ }
+
+ public String contentToTSVString() {
+ StringBuilder sb = new StringBuilder();
+ List<Object> row = new ArrayList<>(schema.getFields().size());
+ for (Field field : schema.getFields()) {
+ row.add(field.getName());
+ }
+ printRow(sb, row);
+ for (int i = 0; i < rowCount; i++) {
+ row.clear();
+ for (FieldVector v : fieldVectors) {
+ row.add(v.getAccessor().getObject(i));
+ }
+ printRow(sb, row);
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index 3375a7d..e246218 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -34,11 +34,15 @@ public class VectorUnloader {
private final int valueCount;
private final List<FieldVector> vectors;
- public VectorUnloader(FieldVector parent) {
+ public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) {
super();
- this.schema = new Schema(parent.getField().getChildren());
- this.valueCount = parent.getAccessor().getValueCount();
- this.vectors = parent.getChildrenFromFields();
+ this.schema = schema;
+ this.valueCount = valueCount;
+ this.vectors = vectors;
+ }
+
+ public VectorUnloader(VectorSchemaRoot root) {
+ this(root.getSchema(), root.getRowCount(), root.getFieldVectors());
}
public Schema getSchema() {
@@ -77,4 +81,5 @@ public class VectorUnloader {
appendNodes(child, nodes, buffers);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
index 3aa3e52..4e2e200 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
@@ -78,4 +78,10 @@ public class ArrowBuffer implements FBSerializable {
public int writeTo(FlatBufferBuilder builder) {
return Buffer.createBuffer(builder, page, offset, size);
}
+
+ @Override
+ public String toString() {
+ return "ArrowBuffer [page=" + page + ", offset=" + offset + ", size=" + size + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
index 9162efd..adb99e2 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
@@ -124,4 +124,12 @@ public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
}
}
+ @Override
+ public String toString() {
+ return "ArrowRecordBatch [length=" + length + ", nodes=" + nodes + ", #buffers=" + buffers.size() + ", buffersLayout="
+ + buffersLayout + ", closed=" + closed + "]";
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 7dcb897..78f69ee 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -18,20 +18,18 @@
package org.apache.arrow.vector;
import java.io.IOException;
+import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.complex.MapVector;
-import org.apache.arrow.vector.complex.NullableMapVector;
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
-import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
-import org.apache.arrow.vector.complex.reader.BaseReader.MapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -42,13 +40,15 @@ public class TestVectorUnloadLoad {
static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
@Test
- public void test() throws IOException {
+ public void testUnloadLoad() throws IOException {
int count = 10000;
Schema schema;
try (
BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) {
+
+ // write some data
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("int");
@@ -61,28 +61,40 @@ public class TestVectorUnloadLoad {
}
writer.setValueCount(count);
- VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root"));
- schema = vectorUnloader.getSchema();
-
+ // unload it
+ FieldVector root = parent.getChild("root");
+ schema = new Schema(root.getField().getChildren());
+ VectorUnloader vectorUnloader = newVectorUnloader(root);
try (
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
- MapVector newParent = new MapVector("parent", finalVectorsAllocator, null)) {
- FieldVector root = newParent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
- VectorLoader vectorLoader = new VectorLoader(schema, root);
+ VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
+ ) {
+
+ // load it
+ VectorLoader vectorLoader = new VectorLoader(newRoot);
vectorLoader.load(recordBatch);
- MapReader rootReader = new SingleMapReaderImpl(newParent).reader("root");
+ FieldReader intReader = newRoot.getVector("int").getReader();
+ FieldReader bigIntReader = newRoot.getVector("bigInt").getReader();
for (int i = 0; i < count; i++) {
- rootReader.setPosition(i);
- Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
- Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
+ intReader.setPosition(i);
+ Assert.assertEquals(i, intReader.readInteger().intValue());
+ bigIntReader.setPosition(i);
+ Assert.assertEquals(i, bigIntReader.readLong().longValue());
}
}
}
}
+ public static VectorUnloader newVectorUnloader(FieldVector root) {
+ Schema schema = new Schema(root.getField().getChildren());
+ int valueCount = root.getAccessor().getValueCount();
+ List<FieldVector> fields = root.getChildrenFromFields();
+ return new VectorUnloader(schema, valueCount, fields);
+ }
+
@AfterClass
public static void afterClass() {
allocator.close();
http://git-wip-us.apache.org/repos/asf/arrow/blob/6996c17f/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index 0f28d53..e97bc14 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -17,6 +17,8 @@
*/
package org.apache.arrow.vector.file;
+import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -29,12 +31,12 @@ import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector.Accessor;
import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.NullableMapVector;
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
-import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
-import org.apache.arrow.vector.complex.reader.BaseReader.MapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
@@ -43,7 +45,6 @@ import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.arrow.vector.holders.NullableTimeStampHolder;
import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.joda.time.DateTimeZone;
import org.junit.After;
@@ -94,8 +95,9 @@ public class TestArrowFile {
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
writeComplexData(count, parent);
- validateComplexContent(count, parent);
- write(parent.getChild("root"), file);
+ FieldVector root = parent.getChild("root");
+ validateComplexContent(count, new VectorSchemaRoot(root));
+ write(root, file);
}
}
@@ -174,33 +176,31 @@ public class TestArrowFile {
// initialize vectors
- NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
-
- VectorLoader vectorLoader = new VectorLoader(schema, root);
-
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- Assert.assertEquals(0, rbBlock.getOffset() % 8);
- Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
- try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
- List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
- for (ArrowBuffer arrowBuffer : buffersLayout) {
- Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ Assert.assertEquals(0, rbBlock.getOffset() % 8);
+ Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
}
- vectorLoader.load(recordBatch);
- }
- validateContent(count, parent);
+ validateContent(count, root);
+ }
}
}
}
- private void validateContent(int count, MapVector parent) {
- MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
+ private void validateContent(int count, VectorSchemaRoot root) {
for (int i = 0; i < count; i++) {
- rootReader.setPosition(i);
- Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
- Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
+ Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
+ Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
}
}
@@ -231,15 +231,15 @@ public class TestArrowFile {
// initialize vectors
- NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
- VectorLoader vectorLoader = new VectorLoader(schema, root);
-
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
- vectorLoader.load(recordBatch);
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ vectorLoader.load(recordBatch);
+ }
+ validateComplexContent(count, root);
}
- validateComplexContent(count, parent);
}
}
}
@@ -255,23 +255,23 @@ public class TestArrowFile {
}
}
- private void validateComplexContent(int count, NullableMapVector parent) {
- printVectors(parent.getChildrenFromFields());
-
- MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
+ private void validateComplexContent(int count, VectorSchemaRoot root) {
+ Assert.assertEquals(count, root.getRowCount());
+ printVectors(root.getFieldVectors());
for (int i = 0; i < count; i++) {
- rootReader.setPosition(i);
- Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
- Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
- Assert.assertEquals(i % 3, rootReader.reader("list").size());
+ Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
+ Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
+ Assert.assertEquals(i % 3, ((List<?>)root.getVector("list").getAccessor().getObject(i)).size());
NullableTimeStampHolder h = new NullableTimeStampHolder();
- rootReader.reader("map").reader("timestamp").read(h);
+ FieldReader mapReader = root.getVector("map").getReader();
+ mapReader.setPosition(i);
+ mapReader.reader("timestamp").read(h);
Assert.assertEquals(i, h.value);
}
}
private void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
- VectorUnloader vectorUnloader = new VectorUnloader(parent);
+ VectorUnloader vectorUnloader = newVectorUnloader(parent);
Schema schema = vectorUnloader.getSchema();
LOGGER.debug("writing schema: " + schema);
try (
@@ -294,7 +294,7 @@ public class TestArrowFile {
MapVector parent = new MapVector("parent", originalVectorAllocator, null);
FileOutputStream fileOutputStream = new FileOutputStream(file);) {
writeData(count, parent);
- VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root"));
+ VectorUnloader vectorUnloader = newVectorUnloader(parent.getChild("root"));
Schema schema = vectorUnloader.getSchema();
Assert.assertEquals(2, schema.getFields().size());
try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) {
@@ -320,20 +320,21 @@ public class TestArrowFile {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("reading schema: " + schema);
- NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
- VectorLoader vectorLoader = new VectorLoader(schema, root);
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- Assert.assertEquals(2, recordBatches.size());
- for (ArrowBlock rbBlock : recordBatches) {
- Assert.assertEquals(0, rbBlock.getOffset() % 8);
- Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
- try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
- List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
- for (ArrowBuffer arrowBuffer : buffersLayout) {
- Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ Assert.assertEquals(2, recordBatches.size());
+ for (ArrowBlock rbBlock : recordBatches) {
+ Assert.assertEquals(0, rbBlock.getOffset() % 8);
+ Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
+ validateContent(count, root);
}
- vectorLoader.load(recordBatch);
- validateContent(count, parent);
}
}
}
@@ -351,7 +352,7 @@ public class TestArrowFile {
printVectors(parent.getChildrenFromFields());
- validateUnionData(count, parent);
+ validateUnionData(count, new VectorSchemaRoot(parent.getChild("root")));
write(parent.getChild("root"), file);
}
@@ -361,44 +362,42 @@ public class TestArrowFile {
FileInputStream fileInputStream = new FileInputStream(file);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
- NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)
) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("reading schema: " + schema);
// initialize vectors
-
- NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
- VectorLoader vectorLoader = new VectorLoader(schema, root);
-
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
- vectorLoader.load(recordBatch);
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ vectorLoader.load(recordBatch);
+ }
+ validateUnionData(count, root);
}
- validateUnionData(count, parent);
}
}
}
- public void validateUnionData(int count, MapVector parent) {
- MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
+ public void validateUnionData(int count, VectorSchemaRoot root) {
+ FieldReader unionReader = root.getVector("union").getReader();
for (int i = 0; i < count; i++) {
- rootReader.setPosition(i);
+ unionReader.setPosition(i);
switch (i % 4) {
case 0:
- Assert.assertEquals(i, rootReader.reader("union").readInteger().intValue());
+ Assert.assertEquals(i, unionReader.readInteger().intValue());
break;
case 1:
- Assert.assertEquals(i, rootReader.reader("union").readLong().longValue());
+ Assert.assertEquals(i, unionReader.readLong().longValue());
break;
case 2:
- Assert.assertEquals(i % 3, rootReader.reader("union").size());
+ Assert.assertEquals(i % 3, unionReader.size());
break;
case 3:
NullableTimeStampHolder h = new NullableTimeStampHolder();
- rootReader.reader("union").reader("timestamp").read(h);
+ unionReader.reader("timestamp").read(h);
Assert.assertEquals(i, h.value);
break;
}