You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/16 18:00:03 UTC
[4/4] arrow git commit: ARROW-542: Adding dictionary encoding to
FileWriter
ARROW-542: Adding dictionary encoding to FileWriter
WIP for comments
Author: Emilio Lahr-Vivaz <el...@ccri.com>
Author: Wes McKinney <we...@twosigma.com>
Closes #334 from elahrvivaz/ARROW-542 and squashes the following commits:
5339730 [Emilio Lahr-Vivaz] fixing bitvector load of value count, adding struct integration test
00d78d3 [Emilio Lahr-Vivaz] fixing set bit validity value in NullableMapVector load
1679934 [Emilio Lahr-Vivaz] cleaning up license
70639e0 [Emilio Lahr-Vivaz] restoring vector loader test
bde4eee [Wes McKinney] Handle 0-length message indicator for EOS in C++ StreamReader
a24854b [Emilio Lahr-Vivaz] fixing StreamToFile conversion
2ee7cfb [Emilio Lahr-Vivaz] fixing FileToStream conversion
adec200 [Emilio Lahr-Vivaz] making arrow magic static, cleanup
8366288 [Emilio Lahr-Vivaz] making magic array private
127937f [Emilio Lahr-Vivaz] removing qualifier for magic
db9a007 [Emilio Lahr-Vivaz] adding dictionary tests to echo server
95c7b2a [Emilio Lahr-Vivaz] cleanup
45caa02 [Emilio Lahr-Vivaz] reverting basewriter dictionary methods
682db6f [Emilio Lahr-Vivaz] cleanup
a1508b9 [Emilio Lahr-Vivaz] removing dictionary vector method (instead use field.dictionary)
43c28af [Emilio Lahr-Vivaz] adding test for nested dictionary encoded list
92a1e6f [Emilio Lahr-Vivaz] fixing imports
e567564 [Emilio Lahr-Vivaz] adding field size check in vectorschemaroot
568fda5 [Emilio Lahr-Vivaz] imports, formatting
363308e [Emilio Lahr-Vivaz] fixing tests
2f69be1 [Emilio Lahr-Vivaz] not passing around dictionary vectors with dictionary fields, adding dictionary encoding to fields, restoring vector loader/unloader
e5c8e02 [Emilio Lahr-Vivaz] Merging dictionary unloader/loader with arrow writer/reader Creating base class for stream/file writer Creating base class with visitors for arrow messages Indentation fixes Other cleanup
d095f3f [Emilio Lahr-Vivaz] ARROW-542: Adding dictionary encoding to file and stream writing
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/49f666e7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/49f666e7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/49f666e7
Branch: refs/heads/master
Commit: 49f666e740208d1e6167537f141f27b6b78b77cb
Parents: 3b65001
Author: Emilio Lahr-Vivaz <el...@ccri.com>
Authored: Thu Mar 16 13:59:53 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 16 13:59:53 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/ipc/reader.cc | 6 +
integration/integration_test.py | 4 +
.../java/org/apache/arrow/tools/EchoServer.java | 48 +-
.../org/apache/arrow/tools/FileRoundtrip.java | 48 +-
.../org/apache/arrow/tools/FileToStream.java | 27 +-
.../org/apache/arrow/tools/Integration.java | 83 +--
.../org/apache/arrow/tools/StreamToFile.java | 19 +-
.../arrow/tools/ArrowFileTestFixtures.java | 51 +-
.../org/apache/arrow/tools/EchoServerTest.java | 280 ++++++--
.../org/apache/arrow/tools/TestIntegration.java | 38 +-
java/tools/tmptestfilesio | Bin 0 -> 628 bytes
.../src/main/codegen/templates/MapWriters.java | 8 +-
.../codegen/templates/NullableValueVectors.java | 40 +-
.../src/main/codegen/templates/UnionVector.java | 10 +-
.../java/org/apache/arrow/vector/BitVector.java | 2 +-
.../org/apache/arrow/vector/FieldVector.java | 4 +-
.../org/apache/arrow/vector/VectorLoader.java | 13 +-
.../apache/arrow/vector/VectorSchemaRoot.java | 32 +-
.../org/apache/arrow/vector/VectorUnloader.java | 27 +-
.../vector/complex/AbstractContainerVector.java | 3 +-
.../arrow/vector/complex/AbstractMapVector.java | 9 +-
.../vector/complex/BaseRepeatedValueVector.java | 5 +-
.../arrow/vector/complex/DictionaryVector.java | 229 -------
.../apache/arrow/vector/complex/ListVector.java | 26 +-
.../apache/arrow/vector/complex/MapVector.java | 5 +-
.../arrow/vector/complex/NullableMapVector.java | 9 +-
.../vector/complex/impl/ComplexWriterImpl.java | 6 +-
.../vector/complex/impl/PromotableWriter.java | 5 +-
.../arrow/vector/dictionary/Dictionary.java | 66 ++
.../vector/dictionary/DictionaryEncoder.java | 144 ++++
.../vector/dictionary/DictionaryProvider.java | 47 ++
.../arrow/vector/file/ArrowFileReader.java | 142 ++++
.../arrow/vector/file/ArrowFileWriter.java | 59 ++
.../apache/arrow/vector/file/ArrowFooter.java | 1 -
.../apache/arrow/vector/file/ArrowMagic.java | 37 ++
.../apache/arrow/vector/file/ArrowReader.java | 222 +++++--
.../apache/arrow/vector/file/ArrowWriter.java | 173 +++--
.../apache/arrow/vector/file/ReadChannel.java | 11 +-
.../arrow/vector/file/SeekableReadChannel.java | 39 ++
.../apache/arrow/vector/file/WriteChannel.java | 7 +-
.../arrow/vector/file/json/JsonFileReader.java | 26 +-
.../vector/schema/ArrowDictionaryBatch.java | 60 ++
.../arrow/vector/schema/ArrowMessage.java | 30 +
.../arrow/vector/schema/ArrowRecordBatch.java | 8 +-
.../arrow/vector/stream/ArrowStreamReader.java | 88 +--
.../arrow/vector/stream/ArrowStreamWriter.java | 75 +--
.../arrow/vector/stream/MessageSerializer.java | 164 ++++-
.../apache/arrow/vector/types/Dictionary.java | 40 --
.../org/apache/arrow/vector/types/Types.java | 114 ++--
.../vector/types/pojo/DictionaryEncoding.java | 51 ++
.../apache/arrow/vector/types/pojo/Field.java | 59 +-
.../apache/arrow/vector/TestDecimalVector.java | 2 +-
.../arrow/vector/TestDictionaryVector.java | 82 +--
.../org/apache/arrow/vector/TestListVector.java | 4 +-
.../apache/arrow/vector/TestValueVector.java | 12 +-
.../arrow/vector/TestVectorUnloadLoad.java | 22 +-
.../complex/impl/TestPromotableWriter.java | 2 +-
.../complex/writer/TestComplexWriter.java | 14 +-
.../apache/arrow/vector/file/TestArrowFile.java | 665 ++++++++++++-------
.../vector/file/TestArrowReaderWriter.java | 28 +-
.../arrow/vector/file/TestArrowStream.java | 102 +++
.../arrow/vector/file/TestArrowStreamPipe.java | 163 +++++
.../arrow/vector/file/json/TestJSONFile.java | 4 +-
.../vector/stream/MessageSerializerTest.java | 8 +-
.../arrow/vector/stream/TestArrowStream.java | 96 ---
.../vector/stream/TestArrowStreamPipe.java | 129 ----
66 files changed, 2522 insertions(+), 1511 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 9734166..4cb5f6c 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -78,6 +78,12 @@ class StreamReader::StreamReaderImpl {
int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+ if (message_length == 0) {
+ // Optional 0 EOS control message
+ *message = nullptr;
+ return Status::OK();
+ }
+
RETURN_NOT_OK(stream_->Read(message_length, &buffer));
if (buffer->size() != message_length) {
return Status::IOError("Unexpected end of stream trying to read message");
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 049436a..5cd63c5 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -680,12 +680,16 @@ class JavaTester(Tester):
cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.StreamToFile',
stream_path, file_path]
+ if self.debug:
+ print(' '.join(cmd))
run_cmd(cmd)
def file_to_stream(self, file_path, stream_path):
cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.FileToStream',
file_path, stream_path]
+ if self.debug:
+ print(' '.join(cmd))
run_cmd(cmd)
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index c00620e..7c0cadd 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -18,23 +18,19 @@
package org.apache.arrow.tools;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
+
+import com.google.common.base.Preconditions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
public class EchoServer {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
@@ -57,30 +53,28 @@ public class EchoServer {
public void run() throws IOException {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
- List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
- try (
- InputStream in = socket.getInputStream();
- OutputStream out = socket.getOutputStream();
- ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
- ) {
- // Read the entire input stream.
- reader.init();
- while (true) {
- ArrowRecordBatch batch = reader.nextRecordBatch();
- if (batch == null) break;
- batches.add(batch);
- }
- LOGGER.info(String.format("Received %d batches", batches.size()));
-
- // Write it back
- try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) {
- for (ArrowRecordBatch batch: batches) {
- writer.writeRecordBatch(batch);
+ // Read the entire input stream and write it back
+ try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries
+ reader.loadNextBatch();
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) {
+ writer.start();
+ int echoed = 0;
+ while (true) {
+ int rowCount = reader.getVectorSchemaRoot().getRowCount();
+ if (rowCount == 0) {
+ break;
+ } else {
+ writer.writeBatch();
+ echoed += rowCount;
+ reader.loadNextBatch();
+ }
}
writer.end();
Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
+ LOGGER.info(String.format("Echoed %d records", echoed));
}
- LOGGER.info("Done writing stream back.");
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index db7a1c2..9fa7b76 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -23,18 +23,12 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -86,35 +80,27 @@ public class FileRoundtrip {
File inFile = validateFile("input", inFileName);
File outFile = validateFile("output", outFileName);
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
- try(
- FileInputStream fileInputStream = new FileInputStream(inFile);
- ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
+ try (FileInputStream fileInputStream = new FileInputStream(inFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
- ArrowFooter footer = arrowReader.readFooter();
- Schema schema = footer.getSchema();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + inFile.length());
LOGGER.debug("Found schema: " + schema);
- try (
- FileOutputStream fileOutputStream = new FileOutputStream(outFile);
- ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
- ) {
-
- // initialize vectors
-
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
- VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
-
- VectorLoader vectorLoader = new VectorLoader(root);
- vectorLoader.load(inRecordBatch);
-
- VectorUnloader vectorUnloader = new VectorUnloader(root);
- ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
- arrowWriter.writeRecordBatch(recordBatch);
+ try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
+ arrowWriter.start();
+ while (true) {
+ arrowReader.loadNextBatch();
+ int loaded = root.getRowCount();
+ if (loaded == 0) {
+ break;
+ } else {
+ arrowWriter.writeBatch();
}
}
+ arrowWriter.end();
}
LOGGER.debug("Output file size: " + outFile.length());
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index ba6505c..d534553 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -25,10 +25,8 @@ import java.io.OutputStream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
/**
@@ -36,19 +34,20 @@ import org.apache.arrow.vector.stream.ArrowStreamWriter;
* first argument and the output is written to standard out.
*/
public class FileToStream {
+
public static void convert(FileInputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
- try(
- ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
- ArrowFooter footer = reader.readFooter();
- try (
- ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
- ) {
- for (ArrowBlock block: footer.getRecordBatches()) {
- try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
- writer.writeRecordBatch(batch);
- }
+ try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries
+ reader.loadNextBatch();
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
+ writer.start();
+ while (root.getRowCount() > 0) {
+ writer.writeBatch();
+ reader.loadNextBatch();
}
+ writer.end();
}
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index 36d4ee5..5d4849c 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -28,16 +28,12 @@ import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.file.json.JsonFileReader;
import org.apache.arrow.vector.file.json.JsonFileWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Validator;
import org.apache.commons.cli.CommandLine;
@@ -69,24 +65,18 @@ public class Integration {
ARROW_TO_JSON(true, false) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
- try(
- BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
- ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
- ArrowFooter footer = arrowReader.readFooter();
- Schema schema = footer.getSchema();
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + arrowFile.length());
LOGGER.debug("Found schema: " + schema);
- try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) {
+ try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
writer.start(schema);
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
- VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
- VectorLoader vectorLoader = new VectorLoader(root);
- vectorLoader.load(inRecordBatch);
- writer.write(root);
- }
+ for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+ arrowReader.loadRecordBatch(rbBlock);
+ writer.write(root);
}
}
LOGGER.debug("Output file size: " + jsonFile.length());
@@ -96,27 +86,22 @@ public class Integration {
JSON_TO_ARROW(false, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
- try (
- BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
- JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
- ) {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
Schema schema = reader.start();
LOGGER.debug("Input file size: " + jsonFile.length());
LOGGER.debug("Found schema: " + schema);
- try (
- FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
- ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
- ) {
-
- // initialize vectors
- VectorSchemaRoot root;
- while ((root = reader.read()) != null) {
- VectorUnloader vectorUnloader = new VectorUnloader(root);
- try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
- arrowWriter.writeRecordBatch(recordBatch);
- }
- root.close();
+ try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
+ VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ // TODO json dictionaries
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+ arrowWriter.start();
+ reader.read(root);
+ while (root.getRowCount() != 0) {
+ arrowWriter.writeBatch();
+ reader.read(root);
}
+ arrowWriter.end();
}
LOGGER.debug("Output file size: " + arrowFile.length());
}
@@ -125,32 +110,26 @@ public class Integration {
VALIDATE(true, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
- try (
- BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
- JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
- FileInputStream fileInputStream = new FileInputStream(arrowFile);
- ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
- ) {
+ try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
+ FileInputStream fileInputStream = new FileInputStream(arrowFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
Schema jsonSchema = jsonReader.start();
- ArrowFooter footer = arrowReader.readFooter();
- Schema arrowSchema = footer.getSchema();
+ VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
+ Schema arrowSchema = arrowRoot.getSchema();
LOGGER.debug("Arrow Input file size: " + arrowFile.length());
LOGGER.debug("ARROW schema: " + arrowSchema);
LOGGER.debug("JSON Input file size: " + jsonFile.length());
LOGGER.debug("JSON schema: " + jsonSchema);
Validator.compareSchemas(jsonSchema, arrowSchema);
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
Iterator<ArrowBlock> iterator = recordBatches.iterator();
VectorSchemaRoot jsonRoot;
while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
ArrowBlock rbBlock = iterator.next();
- try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
- VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
- VectorLoader vectorLoader = new VectorLoader(arrowRoot);
- vectorLoader.load(inRecordBatch);
- Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
- }
+ arrowReader.loadRecordBatch(rbBlock);
+ Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
jsonRoot.close();
}
boolean hasMoreJSON = jsonRoot != null;
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index c8a5c89..3b79d5b 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -27,8 +27,8 @@ import java.nio.channels.Channels;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.stream.ArrowStreamReader;
/**
@@ -38,13 +38,16 @@ public class StreamToFile {
public static void convert(InputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
- reader.init();
- try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
- while (true) {
- ArrowRecordBatch batch = reader.nextRecordBatch();
- if (batch == null) break;
- writer.writeRecordBatch(batch);
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ // load the first batch before instantiating the writer so that we have any dictionaries
+ reader.loadNextBatch();
+ try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
+ writer.start();
+ while (root.getRowCount() > 0) {
+ writer.writeBatch();
+ reader.loadNextBatch();
}
+ writer.end();
}
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index 4cfc52f..f752f7e 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -23,13 +23,10 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -37,10 +34,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
@@ -63,26 +58,14 @@ public class ArrowFileTestFixtures {
static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
// read
- try (
- BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
- FileInputStream fileInputStream = new FileInputStream(testOutFile);
- ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
- BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
- ) {
- ArrowFooter footer = arrowReader.readFooter();
- Schema schema = footer.getSchema();
-
- // initialize vectors
- try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
- VectorLoader vectorLoader = new VectorLoader(root);
-
- List<ArrowBlock> recordBatches = footer.getRecordBatches();
- for (ArrowBlock rbBlock : recordBatches) {
- try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
- vectorLoader.load(recordBatch);
- }
- validateContent(COUNT, root);
- }
+ try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(testOutFile);
+ ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+ Schema schema = root.getSchema();
+ for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+ arrowReader.loadRecordBatch(rbBlock);
+ validateContent(COUNT, root);
}
}
}
@@ -96,16 +79,10 @@ public class ArrowFileTestFixtures {
}
static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
- Schema schema = new Schema(parent.getField().getChildren());
- int valueCount = parent.getAccessor().getValueCount();
- List<FieldVector> fields = parent.getChildrenFromFields();
- VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
- try (
- FileOutputStream fileOutputStream = new FileOutputStream(file);
- ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
- ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
- ) {
- arrowWriter.writeRecordBatch(recordBatch);
+ VectorSchemaRoot root = new VectorSchemaRoot(parent);
+ try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+ ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+ arrowWriter.writeBatch();
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 48d6162..706f8e2 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -24,106 +24,268 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
-import io.netty.buffer.ArrowBuf;
-
public class EchoServerTest {
- public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) {
- ArrowBuf buffer = alloc.buffer(bytes.length);
- buffer.writeBytes(bytes);
- return buffer;
+
+ private static EchoServer server;
+ private static int serverPort;
+ private static Thread serverThread;
+
+ @BeforeClass
+ public static void startEchoServer() throws IOException {
+ server = new EchoServer(0);
+ serverPort = server.port();
+ serverThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ server.run();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ serverThread.start();
}
- public static byte[] array(ArrowBuf buf) {
- byte[] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);
- return bytes;
+ @AfterClass
+ public static void stopEchoServer() throws IOException, InterruptedException {
+ server.close();
+ serverThread.join();
}
- private void testEchoServer(int serverPort, Schema schema, List<ArrowRecordBatch> batches)
+ private void testEchoServer(int serverPort,
+ Field field,
+ NullableTinyIntVector vector,
+ int batches)
throws UnknownHostException, IOException {
BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+ VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0);
try (Socket socket = new Socket("localhost", serverPort);
- ArrowStreamWriter writer = new ArrowStreamWriter(socket.getOutputStream(), schema);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
- for (ArrowRecordBatch batch: batches) {
- writer.writeRecordBatch(batch);
+ writer.start();
+ for (int i = 0; i < batches; i++) {
+ vector.allocateNew(16);
+ for (int j = 0; j < 8; j++) {
+ vector.getMutator().set(j, j + i);
+ vector.getMutator().set(j + 8, 0, (byte) (j + i));
+ }
+ vector.getMutator().setValueCount(16);
+ root.setRowCount(16);
+ writer.writeBatch();
}
writer.end();
- reader.init();
- assertEquals(schema, reader.getSchema());
- for (int i = 0; i < batches.size(); i++) {
- ArrowRecordBatch result = reader.nextRecordBatch();
- ArrowRecordBatch expected = batches.get(i);
- assertTrue(result != null);
- assertEquals(expected.getBuffers().size(), result.getBuffers().size());
- for (int j = 0; j < expected.getBuffers().size(); j++) {
- assertTrue(expected.getBuffers().get(j).compareTo(result.getBuffers().get(j)) == 0);
+ assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema());
+
+ NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0);
+ for (int i = 0; i < batches; i++) {
+ reader.loadNextBatch();
+ assertEquals(16, reader.getVectorSchemaRoot().getRowCount());
+ assertEquals(16, readVector.getAccessor().getValueCount());
+ for (int j = 0; j < 8; j++) {
+ assertEquals(j + i, readVector.getAccessor().get(j));
+ assertTrue(readVector.getAccessor().isNull(j + 8));
}
}
- ArrowRecordBatch result = reader.nextRecordBatch();
- assertTrue(result == null);
+ reader.loadNextBatch();
+ assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
assertEquals(reader.bytesRead(), writer.bytesWritten());
}
}
@Test
public void basicTest() throws InterruptedException, IOException {
- final EchoServer server = new EchoServer(0);
- int serverPort = server.port();
- Thread serverThread = new Thread() {
- @Override
- public void run() {
- try {
- server.run();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
- serverThread.start();
-
BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
- byte[] validity = new byte[] { (byte)255, 0};
- byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
- ArrowBuf validityb = buf(alloc, validity);
- ArrowBuf valuesb = buf(alloc, values);
- ArrowRecordBatch batch = new ArrowRecordBatch(
- 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
- Schema schema = new Schema(asList(new Field(
- "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+ Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList());
+ NullableTinyIntVector vector = new NullableTinyIntVector("testField", alloc, null);
+ Schema schema = new Schema(asList(field));
// Try an empty stream, just the header.
- testEchoServer(serverPort, schema, new ArrayList<ArrowRecordBatch>());
+ testEchoServer(serverPort, field, vector, 0);
// Try with one batch.
- List<ArrowRecordBatch> batches = new ArrayList<>();
- batches.add(batch);
- testEchoServer(serverPort, schema, batches);
+ testEchoServer(serverPort, field, vector, 1);
// Try with a few
- for (int i = 0; i < 10; i++) {
- batches.add(batch);
+ testEchoServer(serverPort, field, vector, 10);
+ }
+
+ @Test
+ public void testFlatDictionary() throws IOException {
+ DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null);
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ NullableIntVector writeVector = new NullableIntVector("varchar", allocator, writeEncoding);
+ NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", allocator, null)) {
+ writeVector.allocateNewSafe();
+ NullableIntVector.Mutator mutator = writeVector.getMutator();
+ mutator.set(0, 0);
+ mutator.set(1, 1);
+ mutator.set(3, 2);
+ mutator.set(4, 1);
+ mutator.set(5, 2);
+ mutator.setValueCount(6);
+
+ writeDictionaryVector.allocateNewSafe();
+ NullableVarCharVector.Mutator dictionaryMutator = writeDictionaryVector.getMutator();
+ dictionaryMutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+ dictionaryMutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+ dictionaryMutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+ dictionaryMutator.setValueCount(3);
+
+ List<Field> fields = ImmutableList.of(writeVector.getField());
+ List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+ DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+ try (Socket socket = new Socket("localhost", serverPort);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+
+ reader.loadNextBatch();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ Assert.assertEquals(6, readerRoot.getRowCount());
+
+ FieldVector readVector = readerRoot.getFieldVectors().get(0);
+ Assert.assertNotNull(readVector);
+
+ DictionaryEncoding readEncoding = readVector.getField().getDictionary();
+ Assert.assertNotNull(readEncoding);
+ Assert.assertEquals(1L, readEncoding.getId());
+
+ FieldVector.Accessor accessor = readVector.getAccessor();
+ Assert.assertEquals(6, accessor.getValueCount());
+ Assert.assertEquals(0, accessor.getObject(0));
+ Assert.assertEquals(1, accessor.getObject(1));
+ Assert.assertEquals(null, accessor.getObject(2));
+ Assert.assertEquals(2, accessor.getObject(3));
+ Assert.assertEquals(1, accessor.getObject(4));
+ Assert.assertEquals(2, accessor.getObject(5));
+
+ Dictionary dictionary = reader.lookup(1L);
+ Assert.assertNotNull(dictionary);
+ NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+ Assert.assertEquals(3, dictionaryAccessor.getValueCount());
+ Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+ Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+ Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2));
+ }
}
- testEchoServer(serverPort, schema, batches);
+ }
- server.close();
- serverThread.join();
+ @Test
+ public void testNestedDictionary() throws IOException {
+ DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null);
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", allocator, null);
+ ListVector writeVector = new ListVector("list", allocator, null, null)) {
+
+ // data being written:
+ // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+ writeDictionaryVector.allocateNew();
+ writeDictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
+ writeDictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+ writeDictionaryVector.getMutator().setValueCount(2);
+
+ writeVector.addOrGetVector(MinorType.INT, writeEncoding);
+ writeVector.allocateNew();
+ UnionListWriter listWriter = new UnionListWriter(writeVector);
+ listWriter.startList();
+ listWriter.writeInt(0);
+ listWriter.writeInt(1);
+ listWriter.endList();
+ listWriter.startList();
+ listWriter.writeInt(0);
+ listWriter.endList();
+ listWriter.startList();
+ listWriter.writeInt(1);
+ listWriter.endList();
+ listWriter.setValueCount(3);
+
+ List<Field> fields = ImmutableList.of(writeVector.getField());
+ List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+ DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+ try (Socket socket = new Socket("localhost", serverPort);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+ ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+
+ reader.loadNextBatch();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ Assert.assertEquals(3, readerRoot.getRowCount());
+
+ ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0);
+ Assert.assertNotNull(readVector);
+
+ Assert.assertNull(readVector.getField().getDictionary());
+ DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0).getDictionary();
+ Assert.assertNotNull(readEncoding);
+ Assert.assertEquals(2L, readEncoding.getId());
+
+ Field nestedField = readVector.getField().getChildren().get(0);
+
+ DictionaryEncoding encoding = nestedField.getDictionary();
+ Assert.assertNotNull(encoding);
+ Assert.assertEquals(2L, encoding.getId());
+ Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+ ListVector.Accessor accessor = readVector.getAccessor();
+ Assert.assertEquals(3, accessor.getValueCount());
+ Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0));
+ Assert.assertEquals(Arrays.asList(0), accessor.getObject(1));
+ Assert.assertEquals(Arrays.asList(1), accessor.getObject(2));
+
+ Dictionary readDictionary = reader.lookup(2L);
+ Assert.assertNotNull(readDictionary);
+ NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) readDictionary.getVector()).getAccessor();
+ Assert.assertEquals(2, dictionaryAccessor.getValueCount());
+ Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+ Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
index 0ae32be..9d4ef5c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
@@ -33,6 +33,11 @@ import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.tools.Integration.Command;
@@ -49,11 +54,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
public class TestIntegration {
@Rule
@@ -128,6 +128,34 @@ public class TestIntegration {
}
}
+ @Test
+ public void testJSONRoundTripWithStruct() throws Exception {
+ File testJSONFile = new File("../../integration/data/struct_example.json");
+ File testOutFile = testFolder.newFile("testOutStruct.arrow");
+ File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json");
+ testOutFile.delete();
+ testRoundTripJSONFile.delete();
+
+ Integration integration = new Integration();
+
+ // convert to arrow
+ String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+ integration.run(args1);
+
+ // convert back to json
+ String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json", testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+ integration.run(args2);
+
+ BufferedReader orig = readNormalized(testJSONFile);
+ BufferedReader rt = readNormalized(testRoundTripJSONFile);
+ String i, o;
+ int j = 0;
+ while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) {
+ assertEquals("line: " + j, i, o);
+ ++j;
+ }
+ }
+
private ObjectMapper om = new ObjectMapper();
{
DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/tmptestfilesio
----------------------------------------------------------------------
diff --git a/java/tools/tmptestfilesio b/java/tools/tmptestfilesio
new file mode 100644
index 0000000..d1b6b6c
Binary files /dev/null and b/java/tools/tmptestfilesio differ
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java
index 4af6eee..428ce04 100644
--- a/java/vector/src/main/codegen/templates/MapWriters.java
+++ b/java/vector/src/main/codegen/templates/MapWriters.java
@@ -64,7 +64,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
list(child.getName());
break;
case UNION:
- UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class), getNullableMapWriterFactory());
+ UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class, null), getNullableMapWriterFactory());
fields.put(handleCase(child.getName()), writer);
break;
<#list vv.types as type><#list type.minor as minor>
@@ -113,7 +113,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
FieldWriter writer = fields.get(finalName);
if(writer == null){
int vectorCount=container.size();
- NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
+ NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class, null);
writer = new PromotableWriter(vector, container, getNullableMapWriterFactory());
if(vectorCount != container.size()) {
writer.allocate();
@@ -157,7 +157,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
FieldWriter writer = fields.get(finalName);
int vectorCount = container.size();
if(writer == null) {
- writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class), container, getNullableMapWriterFactory());
+ writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class, null), container, getNullableMapWriterFactory());
if (container.size() > vectorCount) {
writer.allocate();
}
@@ -222,7 +222,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
if(writer == null) {
ValueVector vector;
ValueVector currentVector = container.getChild(name);
- ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
+ ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class, null<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
writer = new PromotableWriter(v, container, getNullableMapWriterFactory());
vector = v;
if (currentVector == null || currentVector != vector) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index 6b25fb3..b3e10e3 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -65,21 +65,21 @@ public final class ${className} extends BaseDataValueVector implements <#if type
private final int precision;
private final int scale;
- public ${className}(String name, BufferAllocator allocator, int precision, int scale) {
+ public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary, int precision, int scale) {
super(name, allocator);
values = new ${valuesName}(valuesField, allocator, precision, scale);
this.precision = precision;
this.scale = scale;
mutator = new Mutator();
accessor = new Accessor();
- field = new Field(name, true, new Decimal(precision, scale), null);
+ field = new Field(name, true, new Decimal(precision, scale), dictionary, null);
innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
bits,
values
));
}
<#else>
- public ${className}(String name, BufferAllocator allocator) {
+ public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary) {
super(name, allocator);
values = new ${valuesName}(valuesField, allocator);
mutator = new Mutator();
@@ -88,38 +88,38 @@ public final class ${className} extends BaseDataValueVector implements <#if type
minor.class == "SmallInt" ||
minor.class == "Int" ||
minor.class == "BigInt">
- field = new Field(name, true, new Int(${type.width} * 8, true), null);
+ field = new Field(name, true, new Int(${type.width} * 8, true), dictionary, null);
<#elseif minor.class == "UInt1" ||
minor.class == "UInt2" ||
minor.class == "UInt4" ||
minor.class == "UInt8">
- field = new Field(name, true, new Int(${type.width} * 8, false), null);
+ field = new Field(name, true, new Int(${type.width} * 8, false), dictionary, null);
<#elseif minor.class == "Date">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), dictionary, null);
<#elseif minor.class == "Time">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), dictionary, null);
<#elseif minor.class == "Float4">
- field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), null);
+ field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), dictionary, null);
<#elseif minor.class == "Float8">
- field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), null);
+ field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), dictionary, null);
<#elseif minor.class == "TimeStampSec">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), dictionary, null);
<#elseif minor.class == "TimeStampMilli">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), dictionary, null);
<#elseif minor.class == "TimeStampMicro">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), dictionary, null);
<#elseif minor.class == "TimeStampNano">
- field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), null);
+ field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), dictionary, null);
<#elseif minor.class == "IntervalDay">
- field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), null);
+ field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), dictionary, null);
<#elseif minor.class == "IntervalYear">
- field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), null);
+ field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), dictionary, null);
<#elseif minor.class == "VarChar">
- field = new Field(name, true, new Utf8(), null);
+ field = new Field(name, true, new Utf8(), dictionary, null);
<#elseif minor.class == "VarBinary">
- field = new Field(name, true, new Binary(), null);
+ field = new Field(name, true, new Binary(), dictionary, null);
<#elseif minor.class == "Bit">
- field = new Field(name, true, new Bool(), null);
+ field = new Field(name, true, new Bool(), dictionary, null);
</#if>
innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
bits,
@@ -378,9 +378,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public TransferImpl(String name, BufferAllocator allocator){
<#if minor.class == "Decimal">
- to = new ${className}(name, allocator, precision, scale);
+ to = new ${className}(name, allocator, field.getDictionary(), precision, scale);
<#else>
- to = new ${className}(name, allocator);
+ to = new ${className}(name, allocator, field.getDictionary());
</#if>
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index 1a6908d..076ed93 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -118,11 +118,11 @@ public class UnionVector implements FieldVector {
public List<BufferBacked> getFieldInnerVectors() {
return this.innerVectors;
}
-
+
public NullableMapVector getMap() {
if (mapVector == null) {
int vectorCount = internalMap.size();
- mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
+ mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class, null);
if (internalMap.size() > vectorCount) {
mapVector.allocateNew();
if (callBack != null) {
@@ -144,7 +144,7 @@ public class UnionVector implements FieldVector {
public Nullable${name}Vector get${name}Vector() {
if (${uncappedName}Vector == null) {
int vectorCount = internalMap.size();
- ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class);
+ ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class, null);
if (internalMap.size() > vectorCount) {
${uncappedName}Vector.allocateNew();
if (callBack != null) {
@@ -162,7 +162,7 @@ public class UnionVector implements FieldVector {
public ListVector getList() {
if (listVector == null) {
int vectorCount = internalMap.size();
- listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class);
+ listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class, null);
if (internalMap.size() > vectorCount) {
listVector.allocateNew();
if (callBack != null) {
@@ -262,7 +262,7 @@ public class UnionVector implements FieldVector {
public FieldVector addVector(FieldVector v) {
String name = v.getMinorType().name().toLowerCase();
Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
- final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass());
+ final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass(), v.getField().getDictionary());
v.makeTransferPair(newVector).transfer();
internalMap.putChild(name, newVector);
if (callBack != null) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
index d1e9abe..179f2ee 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
@@ -81,6 +81,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
} else {
super.load(fieldNode, data);
}
+ this.valueCount = fieldNode.getLength();
}
@Override
@@ -451,7 +452,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
/**
* set count bits to 1 in data starting at firstBitIndex
- * @param data the buffer to set
* @param firstBitIndex the index of the first bit to set
* @param count the number of bits to set
*/
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
index b28433c..0fdbc48 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -19,11 +19,10 @@ package org.apache.arrow.vector;
import java.util.List;
+import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.Field;
-import io.netty.buffer.ArrowBuf;
-
/**
* A vector corresponding to a Field in the schema
* It has inner vectors backed by buffers (validity, offsets, data, ...)
@@ -61,5 +60,4 @@ public interface FieldVector extends ValueVector {
* @return the inner vectors for this field as defined by the TypeLayout
*/
List<BufferBacked> getFieldInnerVectors();
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 5c1176c..76de250 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -36,15 +36,14 @@ import io.netty.buffer.ArrowBuf;
* Loads buffers into vectors
*/
public class VectorLoader {
+
private final VectorSchemaRoot root;
/**
* will create children in root based on schema
- * @param schema the expected schema
* @param root the root to add vectors to based on schema
*/
public VectorLoader(VectorSchemaRoot root) {
- super();
this.root = root;
}
@@ -57,18 +56,16 @@ public class VectorLoader {
Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
List<Field> fields = root.getSchema().getFields();
- for (int i = 0; i < fields.size(); ++i) {
- Field field = fields.get(i);
+ for (Field field: fields) {
FieldVector fieldVector = root.getVector(field.getName());
loadBuffers(fieldVector, field, buffers, nodes);
}
root.setRowCount(recordBatch.getLength());
if (nodes.hasNext() || buffers.hasNext()) {
- throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
+ throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
}
}
-
private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
checkArgument(nodes.hasNext(),
"no more field nodes for for field " + field + " and vector " + vector);
@@ -82,7 +79,7 @@ public class VectorLoader {
vector.loadFieldBuffers(fieldNode, ownBuffers);
} catch (RuntimeException e) {
throw new IllegalArgumentException("Could not load buffers for field " +
- field + ". error message: " + e.getMessage(), e);
+ field + ". error message: " + e.getMessage(), e);
}
List<Field> children = field.getChildren();
if (children.size() > 0) {
@@ -96,4 +93,4 @@ public class VectorLoader {
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
index 1cbe187..7e626fb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -18,7 +18,6 @@
package org.apache.arrow.vector;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,6 +28,9 @@ import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
+/**
+ * Holder for a set of vectors to be loaded/unloaded
+ */
public class VectorSchemaRoot implements AutoCloseable {
private final Schema schema;
@@ -37,9 +39,17 @@ public class VectorSchemaRoot implements AutoCloseable {
private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
public VectorSchemaRoot(FieldVector parent) {
- this.schema = new Schema(parent.getField().getChildren());
- this.rowCount = parent.getAccessor().getValueCount();
- this.fieldVectors = parent.getChildrenFromFields();
+ this(parent.getField().getChildren(), parent.getChildrenFromFields(), parent.getAccessor().getValueCount());
+ }
+
+ public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors, int rowCount) {
+ if (fields.size() != fieldVectors.size()) {
+ throw new IllegalArgumentException("Fields must match field vectors. Found " +
+ fieldVectors.size() + " vectors and " + fields.size() + " fields");
+ }
+ this.schema = new Schema(fields);
+ this.rowCount = rowCount;
+ this.fieldVectors = fieldVectors;
for (int i = 0; i < schema.getFields().size(); ++i) {
Field field = schema.getFields().get(i);
FieldVector vector = fieldVectors.get(i);
@@ -47,21 +57,19 @@ public class VectorSchemaRoot implements AutoCloseable {
}
}
- public VectorSchemaRoot(Schema schema, BufferAllocator allocator) {
- super();
- this.schema = schema;
+ public static VectorSchemaRoot create(Schema schema, BufferAllocator allocator) {
List<FieldVector> fieldVectors = new ArrayList<>();
for (Field field : schema.getFields()) {
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
- FieldVector vector = minorType.getNewVector(field.getName(), allocator, null);
+ FieldVector vector = minorType.getNewVector(field.getName(), allocator, field.getDictionary(), null);
vector.initializeChildrenFromFields(field.getChildren());
fieldVectors.add(vector);
- fieldVectorsMap.put(field.getName(), vector);
}
- this.fieldVectors = Collections.unmodifiableList(fieldVectors);
- if (this.fieldVectors.size() != schema.getFields().size()) {
- throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size());
+ if (fieldVectors.size() != schema.getFields().size()) {
+ throw new IllegalArgumentException("The root vector did not create the right number of children. found " +
+ fieldVectors.size() + " expected " + schema.getFields().size());
}
+ return new VectorSchemaRoot(schema.getFields(), fieldVectors, 0);
}
public List<FieldVector> getFieldVectors() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index 92d8cb0..8e9ff6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -20,42 +20,27 @@ package org.apache.arrow.vector;
import java.util.ArrayList;
import java.util.List;
+import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.ValueVector.Accessor;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.schema.ArrowVectorType;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-import io.netty.buffer.ArrowBuf;
public class VectorUnloader {
- private final Schema schema;
- private final int valueCount;
- private final List<FieldVector> vectors;
-
- public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) {
- super();
- this.schema = schema;
- this.valueCount = valueCount;
- this.vectors = vectors;
- }
+ private final VectorSchemaRoot root;
public VectorUnloader(VectorSchemaRoot root) {
- this(root.getSchema(), root.getRowCount(), root.getFieldVectors());
- }
-
- public Schema getSchema() {
- return schema;
+ this.root = root;
}
public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
- for (FieldVector vector : vectors) {
+ for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
}
- return new ArrowRecordBatch(valueCount, nodes, buffers);
+ return new ArrowRecordBatch(root.getRowCount(), nodes, buffers);
}
private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -74,4 +59,4 @@ public class VectorUnloader {
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index 2f68886..86a5e82 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -22,6 +22,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.util.CallBack;
/**
@@ -85,7 +86,7 @@ public abstract class AbstractContainerVector implements ValueVector {
public abstract int size();
// add a new vector with the input MajorType or return the existing vector if we already added one with the same type
- public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale);
+ public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale);
// return the child vector with the input name
public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz);
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
index f030d16..baeeb07 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
@@ -26,6 +26,7 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.MapWithOrdinal;
@@ -110,7 +111,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
* @return resultant {@link org.apache.arrow.vector.ValueVector}
*/
@Override
- public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) {
+ public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale) {
final ValueVector existing = getChild(name);
boolean create = false;
if (existing == null) {
@@ -122,7 +123,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
create = true;
}
if (create) {
- final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale));
+ final T vector = clazz.cast(minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale));
putChild(name, vector);
if (callBack!=null) {
callBack.doWork();
@@ -162,12 +163,12 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
return typeify(v, clazz);
}
- protected ValueVector add(String name, MinorType minorType, int... precisionScale) {
+ protected ValueVector add(String name, MinorType minorType, DictionaryEncoding dictionary, int... precisionScale) {
final ValueVector existing = getChild(name);
if (existing != null) {
throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType));
}
- FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale);
+ FieldVector vector = minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale);
putChild(name, vector);
if (callBack!=null) {
callBack.doWork();
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 7424df4..eeb8f58 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -28,6 +28,7 @@ import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
import com.google.common.base.Preconditions;
@@ -150,10 +151,10 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
return vector == DEFAULT_DATA_VECTOR ? 0:1;
}
- public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType) {
+ public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) {
boolean created = false;
if (vector instanceof ZeroVector) {
- vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, null);
+ vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null);
// returned vector must have the same field
created = true;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
deleted file mode 100644
index 84760ea..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*******************************************************************************
-
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.arrow.vector.complex;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.NullableIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.complex.reader.FieldReader;
-import org.apache.arrow.vector.types.Dictionary;
-import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.util.TransferPair;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-public class DictionaryVector implements ValueVector {
-
- private ValueVector indices;
- private Dictionary dictionary;
-
- public DictionaryVector(ValueVector indices, Dictionary dictionary) {
- this.indices = indices;
- this.dictionary = dictionary;
- }
-
- /**
- * Dictionary encodes a vector. The dictionary will be built using the values from the vector.
- *
- * @param vector vector to encode
- * @return dictionary encoded vector
- */
- public static DictionaryVector encode(ValueVector vector) {
- validateType(vector.getMinorType());
- Map<Object, Integer> lookUps = new HashMap<>();
- Map<Integer, Integer> transfers = new HashMap<>();
-
- ValueVector.Accessor accessor = vector.getAccessor();
- int count = accessor.getValueCount();
-
- NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
- indices.allocateNew(count);
- NullableIntVector.Mutator mutator = indices.getMutator();
-
- int nextIndex = 0;
- for (int i = 0; i < count; i++) {
- Object value = accessor.getObject(i);
- if (value != null) { // if it's null leave it null
- Integer index = lookUps.get(value);
- if (index == null) {
- index = nextIndex++;
- lookUps.put(value, index);
- transfers.put(i, index);
- }
- mutator.set(i, index);
- }
- }
- mutator.setValueCount(count);
-
- // copy the dictionary values into the dictionary vector
- TransferPair dictionaryTransfer = vector.getTransferPair(vector.getAllocator());
- ValueVector dictionaryVector = dictionaryTransfer.getTo();
- dictionaryVector.allocateNewSafe();
- for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) {
- dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue());
- }
- dictionaryVector.getMutator().setValueCount(transfers.size());
- Dictionary dictionary = new Dictionary(dictionaryVector, false);
-
- return new DictionaryVector(indices, dictionary);
- }
-
- /**
- * Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector.
- *
- * @param vector vector to encode
- * @param dictionary dictionary used for encoding
- * @return dictionary encoded vector
- */
- public static DictionaryVector encode(ValueVector vector, Dictionary dictionary) {
- validateType(vector.getMinorType());
- // load dictionary values into a hashmap for lookup
- ValueVector.Accessor dictionaryAccessor = dictionary.getDictionary().getAccessor();
- Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount());
- for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) {
- // for primitive array types we need a wrapper that implements equals and hashcode appropriately
- lookUps.put(dictionaryAccessor.getObject(i), i);
- }
-
- // vector to hold our indices (dictionary encoded values)
- NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
- NullableIntVector.Mutator mutator = indices.getMutator();
-
- ValueVector.Accessor accessor = vector.getAccessor();
- int count = accessor.getValueCount();
-
- indices.allocateNew(count);
-
- for (int i = 0; i < count; i++) {
- Object value = accessor.getObject(i);
- if (value != null) { // if it's null leave it null
- // note: this may fail if value was not included in the dictionary
- mutator.set(i, lookUps.get(value));
- }
- }
- mutator.setValueCount(count);
-
- return new DictionaryVector(indices, dictionary);
- }
-
- /**
- * Decodes a dictionary encoded array using the provided dictionary.
- *
- * @param indices dictionary encoded values, must be int type
- * @param dictionary dictionary used to decode the values
- * @return vector with values restored from dictionary
- */
- public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
- ValueVector.Accessor accessor = indices.getAccessor();
- int count = accessor.getValueCount();
- ValueVector dictionaryVector = dictionary.getDictionary();
- // copy the dictionary values into the decoded vector
- TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator());
- transfer.getTo().allocateNewSafe();
- for (int i = 0; i < count; i++) {
- Object index = accessor.getObject(i);
- if (index != null) {
- transfer.copyValueSafe(((Number) index).intValue(), i);
- }
- }
-
- ValueVector decoded = transfer.getTo();
- decoded.getMutator().setValueCount(count);
- return decoded;
- }
-
- private static void validateType(MinorType type) {
- // byte arrays don't work as keys in our dictionary map - we could wrap them with something to
- // implement equals and hashcode if we want that functionality
- if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) {
- throw new IllegalArgumentException("Dictionary encoding for complex types not implemented");
- }
- }
-
- public ValueVector getIndexVector() { return indices; }
-
- public ValueVector getDictionaryVector() { return dictionary.getDictionary(); }
-
- public Dictionary getDictionary() { return dictionary; }
-
- @Override
- public MinorType getMinorType() { return indices.getMinorType(); }
-
- @Override
- public Field getField() { return indices.getField(); }
-
- // note: dictionary vector is not closed, as it may be shared
- @Override
- public void close() { indices.close(); }
-
- @Override
- public void allocateNew() throws OutOfMemoryException { indices.allocateNew(); }
-
- @Override
- public boolean allocateNewSafe() { return indices.allocateNewSafe(); }
-
- @Override
- public BufferAllocator getAllocator() { return indices.getAllocator(); }
-
- @Override
- public void setInitialCapacity(int numRecords) { indices.setInitialCapacity(numRecords); }
-
- @Override
- public int getValueCapacity() { return indices.getValueCapacity(); }
-
- @Override
- public int getBufferSize() { return indices.getBufferSize(); }
-
- @Override
- public int getBufferSizeFor(int valueCount) { return indices.getBufferSizeFor(valueCount); }
-
- @Override
- public Iterator<ValueVector> iterator() {
- return indices.iterator();
- }
-
- @Override
- public void clear() { indices.clear(); }
-
- @Override
- public TransferPair getTransferPair(BufferAllocator allocator) { return indices.getTransferPair(allocator); }
-
- @Override
- public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return indices.getTransferPair(ref, allocator); }
-
- @Override
- public TransferPair makeTransferPair(ValueVector target) { return indices.makeTransferPair(target); }
-
- @Override
- public Accessor getAccessor() { return indices.getAccessor(); }
-
- @Override
- public Mutator getMutator() { return indices.getMutator(); }
-
- @Override
- public FieldReader getReader() { return indices.getReader(); }
-
- @Override
- public ArrowBuf[] getBuffers(boolean clear) { return indices.getBuffers(clear); }
-}