You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/02/02 13:36:38 UTC

arrow git commit: ARROW-497: Integration harness for streaming file format

Repository: arrow
Updated Branches:
  refs/heads/master be5d73f2c -> 0ae4d86e5


ARROW-497: Integration harness for streaming file format

These tests pass locally for me. Thanks @nongli for this!

Author: Nong Li <no...@gmail.com>
Author: Wes McKinney <we...@twosigma.com>

Closes #312 from wesm/streaming-integration and squashes the following commits:

8b9ad76 [Wes McKinney] Hook stream<->file tools together and get integration tests working. Quiet test output in TestArrowStreamPipe
c7f0483 [Nong Li] ARROW-XXX: [Java] Add file <=> stream utility tools.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0ae4d86e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0ae4d86e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0ae4d86e

Branch: refs/heads/master
Commit: 0ae4d86e5ef8ee53a8810f4324dce80ec6a9d422
Parents: be5d73f
Author: Nong Li <no...@gmail.com>
Authored: Thu Feb 2 14:36:23 2017 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Thu Feb 2 14:36:23 2017 +0100

----------------------------------------------------------------------
 ci/travis_script_integration.sh                 |  3 +
 integration/integration_test.py                 | 76 ++++++++++++++++----
 .../org/apache/arrow/tools/FileToStream.java    | 68 ++++++++++++++++++
 .../org/apache/arrow/tools/StreamToFile.java    | 61 ++++++++++++++++
 .../arrow/vector/stream/MessageSerializer.java  |  2 +-
 .../vector/stream/TestArrowStreamPipe.java      |  2 +-
 6 files changed, 198 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/ci/travis_script_integration.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index d93411b..c019a4b 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -28,7 +28,10 @@ pushd $TRAVIS_BUILD_DIR/integration
 
 VERSION=0.1.1-SNAPSHOT
 export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar
+
 export ARROW_CPP_TESTER=$CPP_BUILD_DIR/debug/json-integration-test
+export ARROW_CPP_STREAM_TO_FILE=$CPP_BUILD_DIR/debug/stream-to-file
+export ARROW_CPP_FILE_TO_STREAM=$CPP_BUILD_DIR/debug/file-to-stream
 
 source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
 export MINICONDA=$HOME/miniconda

http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 77510da..a622bf2 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -556,12 +556,25 @@ class IntegrationRunner(object):
                                                            consumer.name))
 
             for json_path in self.json_files:
-                print('Testing with {0}'.format(json_path))
+                print('Testing file {0}'.format(json_path))
 
-                arrow_path = os.path.join(self.temp_dir, guid())
+                # Make the random access file
+                print('-- Creating binary inputs')
+                producer_file_path = os.path.join(self.temp_dir, guid())
+                producer.json_to_file(json_path, producer_file_path)
 
-                producer.json_to_arrow(json_path, arrow_path)
-                consumer.validate(json_path, arrow_path)
+                # Validate the file
+                print('-- Validating file')
+                consumer.validate(json_path, producer_file_path)
+
+                print('-- Validating stream')
+                producer_stream_path = os.path.join(self.temp_dir, guid())
+                consumer_file_path = os.path.join(self.temp_dir, guid())
+                producer.file_to_stream(producer_file_path,
+                                        producer_stream_path)
+                consumer.stream_to_file(producer_stream_path,
+                                        consumer_file_path)
+                consumer.validate(json_path, consumer_file_path)
 
 
 class Tester(object):
@@ -569,7 +582,13 @@ class Tester(object):
     def __init__(self, debug=False):
         self.debug = debug
 
-    def json_to_arrow(self, json_path, arrow_path):
+    def json_to_file(self, json_path, arrow_path):
+        raise NotImplementedError
+
+    def stream_to_file(self, stream_path, file_path):
+        raise NotImplementedError
+
+    def file_to_stream(self, file_path, stream_path):
         raise NotImplementedError
 
     def validate(self, json_path, arrow_path):
@@ -601,21 +620,40 @@ class JavaTester(Tester):
         if self.debug:
             print(' '.join(cmd))
 
-        return run_cmd(cmd)
+        run_cmd(cmd)
 
     def validate(self, json_path, arrow_path):
         return self._run(arrow_path, json_path, 'VALIDATE')
 
-    def json_to_arrow(self, json_path, arrow_path):
+    def json_to_file(self, json_path, arrow_path):
         return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
 
+    def stream_to_file(self, stream_path, file_path):
+        cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
+               'org.apache.arrow.tools.StreamToFile',
+               stream_path, file_path]
+        run_cmd(cmd)
+
+    def file_to_stream(self, file_path, stream_path):
+        cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
+               'org.apache.arrow.tools.FileToStream',
+               file_path, stream_path]
+        run_cmd(cmd)
+
 
 class CPPTester(Tester):
 
+    BUILD_PATH = os.path.join(ARROW_HOME, 'cpp/test-build/debug')
     CPP_INTEGRATION_EXE = os.environ.get(
-        'ARROW_CPP_TESTER',
-        os.path.join(ARROW_HOME,
-                     'cpp/test-build/debug/json-integration-test'))
+        'ARROW_CPP_TESTER', os.path.join(BUILD_PATH, 'json-integration-test'))
+
+    STREAM_TO_FILE = os.environ.get(
+        'ARROW_CPP_STREAM_TO_FILE',
+        os.path.join(BUILD_PATH, 'stream-to-file'))
+
+    FILE_TO_STREAM = os.environ.get(
+        'ARROW_CPP_FILE_TO_STREAM',
+        os.path.join(BUILD_PATH, 'file-to-stream'))
 
     name = 'C++'
 
@@ -633,14 +671,28 @@ class CPPTester(Tester):
         if self.debug:
             print(' '.join(cmd))
 
-        return run_cmd(cmd)
+        run_cmd(cmd)
 
     def validate(self, json_path, arrow_path):
         return self._run(arrow_path, json_path, 'VALIDATE')
 
-    def json_to_arrow(self, json_path, arrow_path):
+    def json_to_file(self, json_path, arrow_path):
         return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
 
+    def stream_to_file(self, stream_path, file_path):
+        cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
+        cmd = ' '.join(cmd)
+        if self.debug:
+            print(cmd)
+        os.system(cmd)
+
+    def file_to_stream(self, file_path, stream_path):
+        cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
+        cmd = ' '.join(cmd)
+        if self.debug:
+            print(cmd)
+        os.system(cmd)
+
 
 def get_static_json_files():
     glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')

http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
new file mode 100644
index 0000000..ba6505c
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowFooter;
+import org.apache.arrow.vector.file.ArrowReader;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+
+/**
+ * Converts an Arrow file to an Arrow stream. The file should be specified as the
+ * first argument and the output is written to standard out.
+ */
+public class FileToStream {
+  public static void convert(FileInputStream in, OutputStream out) throws IOException {
+    BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+    try(
+        ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
+      ArrowFooter footer = reader.readFooter();
+      try (
+        ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
+      ) {
+        for (ArrowBlock block: footer.getRecordBatches()) {
+          try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
+            writer.writeRecordBatch(batch);
+          }
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length != 1 && args.length != 2) {
+      System.err.println("Usage: FileToStream <input file> [output file]");
+      System.exit(1);
+    }
+
+    FileInputStream in = new FileInputStream(new File(args[0]));
+    OutputStream out = args.length == 1 ?
+        System.out : new FileOutputStream(new File(args[1]));
+
+    convert(in, out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
new file mode 100644
index 0000000..c8a5c89
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+
+/**
+ * Converts an Arrow stream to an Arrow file.
+ */
+public class StreamToFile {
+  public static void convert(InputStream in, OutputStream out) throws IOException {
+    BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+    try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+      reader.init();
+      try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
+        while (true) {
+          ArrowRecordBatch batch = reader.nextRecordBatch();
+          if (batch == null) break;
+          writer.writeRecordBatch(batch);
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    InputStream in = System.in;
+    OutputStream out = System.out;
+    if (args.length == 2) {
+      in = new FileInputStream(new File(args[0]));
+      out = new FileOutputStream(new File(args[1]));
+    }
+    convert(in, out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 7ab740c..92df250 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -226,7 +226,7 @@ public class MessageSerializer {
     Message.startMessage(builder);
     Message.addHeaderType(builder, headerType);
     Message.addHeader(builder, headerOffset);
-    Message.addVersion(builder, MetadataVersion.V1);
+    Message.addVersion(builder, MetadataVersion.V2);
     Message.addBodyLength(builder, bodyLength);
     builder.finish(Message.endMessage(builder));
     return builder.dataBuffer();

http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
index a0a7ffa..aa0b77e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
@@ -113,7 +113,7 @@ public class TestArrowStreamPipe {
   // Starts up a producer and consumer thread to read/write batches.
   @Test
   public void pipeTest() throws IOException, InterruptedException {
-    int NUM_BATCHES = 1000;
+    int NUM_BATCHES = 10;
     Pipe pipe = Pipe.open();
     WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
     ReaderThread reader = new ReaderThread(pipe.source());