You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/02/02 13:36:38 UTC
arrow git commit: ARROW-497: Integration harness for streaming file
format
Repository: arrow
Updated Branches:
refs/heads/master be5d73f2c -> 0ae4d86e5
ARROW-497: Integration harness for streaming file format
These tests pass locally for me. Thanks @nongli for this!
Author: Nong Li <no...@gmail.com>
Author: Wes McKinney <we...@twosigma.com>
Closes #312 from wesm/streaming-integration and squashes the following commits:
8b9ad76 [Wes McKinney] Hook stream<->file tools together and get integration tests working. Quiet test output in TestArrowStreamPipe
c7f0483 [Nong Li] ARROW-XXX: [Java] Add file <=> stream utility tools.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0ae4d86e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0ae4d86e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0ae4d86e
Branch: refs/heads/master
Commit: 0ae4d86e5ef8ee53a8810f4324dce80ec6a9d422
Parents: be5d73f
Author: Nong Li <no...@gmail.com>
Authored: Thu Feb 2 14:36:23 2017 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Thu Feb 2 14:36:23 2017 +0100
----------------------------------------------------------------------
ci/travis_script_integration.sh | 3 +
integration/integration_test.py | 76 ++++++++++++++++----
.../org/apache/arrow/tools/FileToStream.java | 68 ++++++++++++++++++
.../org/apache/arrow/tools/StreamToFile.java | 61 ++++++++++++++++
.../arrow/vector/stream/MessageSerializer.java | 2 +-
.../vector/stream/TestArrowStreamPipe.java | 2 +-
6 files changed, 198 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/ci/travis_script_integration.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index d93411b..c019a4b 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -28,7 +28,10 @@ pushd $TRAVIS_BUILD_DIR/integration
VERSION=0.1.1-SNAPSHOT
export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar
+
export ARROW_CPP_TESTER=$CPP_BUILD_DIR/debug/json-integration-test
+export ARROW_CPP_STREAM_TO_FILE=$CPP_BUILD_DIR/debug/stream-to-file
+export ARROW_CPP_FILE_TO_STREAM=$CPP_BUILD_DIR/debug/file-to-stream
source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
export MINICONDA=$HOME/miniconda
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 77510da..a622bf2 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -556,12 +556,25 @@ class IntegrationRunner(object):
consumer.name))
for json_path in self.json_files:
- print('Testing with {0}'.format(json_path))
+ print('Testing file {0}'.format(json_path))
- arrow_path = os.path.join(self.temp_dir, guid())
+ # Make the random access file
+ print('-- Creating binary inputs')
+ producer_file_path = os.path.join(self.temp_dir, guid())
+ producer.json_to_file(json_path, producer_file_path)
- producer.json_to_arrow(json_path, arrow_path)
- consumer.validate(json_path, arrow_path)
+ # Validate the file
+ print('-- Validating file')
+ consumer.validate(json_path, producer_file_path)
+
+ print('-- Validating stream')
+ producer_stream_path = os.path.join(self.temp_dir, guid())
+ consumer_file_path = os.path.join(self.temp_dir, guid())
+ producer.file_to_stream(producer_file_path,
+ producer_stream_path)
+ consumer.stream_to_file(producer_stream_path,
+ consumer_file_path)
+ consumer.validate(json_path, consumer_file_path)
class Tester(object):
@@ -569,7 +582,13 @@ class Tester(object):
def __init__(self, debug=False):
self.debug = debug
- def json_to_arrow(self, json_path, arrow_path):
+ def json_to_file(self, json_path, arrow_path):
+ raise NotImplementedError
+
+ def stream_to_file(self, stream_path, file_path):
+ raise NotImplementedError
+
+ def file_to_stream(self, file_path, stream_path):
raise NotImplementedError
def validate(self, json_path, arrow_path):
@@ -601,21 +620,40 @@ class JavaTester(Tester):
if self.debug:
print(' '.join(cmd))
- return run_cmd(cmd)
+ run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'VALIDATE')
- def json_to_arrow(self, json_path, arrow_path):
+ def json_to_file(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+ def stream_to_file(self, stream_path, file_path):
+ cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
+ 'org.apache.arrow.tools.StreamToFile',
+ stream_path, file_path]
+ run_cmd(cmd)
+
+ def file_to_stream(self, file_path, stream_path):
+ cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
+ 'org.apache.arrow.tools.FileToStream',
+ file_path, stream_path]
+ run_cmd(cmd)
+
class CPPTester(Tester):
+ BUILD_PATH = os.path.join(ARROW_HOME, 'cpp/test-build/debug')
CPP_INTEGRATION_EXE = os.environ.get(
- 'ARROW_CPP_TESTER',
- os.path.join(ARROW_HOME,
- 'cpp/test-build/debug/json-integration-test'))
+ 'ARROW_CPP_TESTER', os.path.join(BUILD_PATH, 'json-integration-test'))
+
+ STREAM_TO_FILE = os.environ.get(
+ 'ARROW_CPP_STREAM_TO_FILE',
+ os.path.join(BUILD_PATH, 'stream-to-file'))
+
+ FILE_TO_STREAM = os.environ.get(
+ 'ARROW_CPP_FILE_TO_STREAM',
+ os.path.join(BUILD_PATH, 'file-to-stream'))
name = 'C++'
@@ -633,14 +671,28 @@ class CPPTester(Tester):
if self.debug:
print(' '.join(cmd))
- return run_cmd(cmd)
+ run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'VALIDATE')
- def json_to_arrow(self, json_path, arrow_path):
+ def json_to_file(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+ def stream_to_file(self, stream_path, file_path):
+ cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
+ cmd = ' '.join(cmd)
+ if self.debug:
+ print(cmd)
+ os.system(cmd)
+
+ def file_to_stream(self, file_path, stream_path):
+ cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
+ cmd = ' '.join(cmd)
+ if self.debug:
+ print(cmd)
+ os.system(cmd)
+
def get_static_json_files():
glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
new file mode 100644
index 0000000..ba6505c
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowFooter;
+import org.apache.arrow.vector.file.ArrowReader;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+
+/**
+ * Converts an Arrow file to an Arrow stream. The file should be specified as the
+ * first argument and the output is written to standard out.
+ */
+public class FileToStream {
+ public static void convert(FileInputStream in, OutputStream out) throws IOException {
+ BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ try(
+ ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
+ ArrowFooter footer = reader.readFooter();
+ try (
+ ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
+ ) {
+ for (ArrowBlock block: footer.getRecordBatches()) {
+ try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
+ writer.writeRecordBatch(batch);
+ }
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 1 && args.length != 2) {
+ System.err.println("Usage: FileToStream <input file> [output file]");
+ System.exit(1);
+ }
+
+ FileInputStream in = new FileInputStream(new File(args[0]));
+ OutputStream out = args.length == 1 ?
+ System.out : new FileOutputStream(new File(args[1]));
+
+ convert(in, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
new file mode 100644
index 0000000..c8a5c89
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+
+/**
+ * Converts an Arrow stream to an Arrow file.
+ */
+public class StreamToFile {
+ public static void convert(InputStream in, OutputStream out) throws IOException {
+ BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+ reader.init();
+ try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
+ while (true) {
+ ArrowRecordBatch batch = reader.nextRecordBatch();
+ if (batch == null) break;
+ writer.writeRecordBatch(batch);
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ InputStream in = System.in;
+ OutputStream out = System.out;
+ if (args.length == 2) {
+ in = new FileInputStream(new File(args[0]));
+ out = new FileOutputStream(new File(args[1]));
+ }
+ convert(in, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 7ab740c..92df250 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -226,7 +226,7 @@ public class MessageSerializer {
Message.startMessage(builder);
Message.addHeaderType(builder, headerType);
Message.addHeader(builder, headerOffset);
- Message.addVersion(builder, MetadataVersion.V1);
+ Message.addVersion(builder, MetadataVersion.V2);
Message.addBodyLength(builder, bodyLength);
builder.finish(Message.endMessage(builder));
return builder.dataBuffer();
http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
index a0a7ffa..aa0b77e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
@@ -113,7 +113,7 @@ public class TestArrowStreamPipe {
// Starts up a producer and consumer thread to read/write batches.
@Test
public void pipeTest() throws IOException, InterruptedException {
- int NUM_BATCHES = 1000;
+ int NUM_BATCHES = 10;
Pipe pipe = Pipe.open();
WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
ReaderThread reader = new ReaderThread(pipe.source());