You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/03 15:22:58 UTC

[arrow] branch master updated: ARROW-5395: [C++] Utilize stream EOS in File format

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bd9ad2  ARROW-5395: [C++] Utilize stream EOS in File format
7bd9ad2 is described below

commit 7bd9ad236ddef9c66b4f8e0911dea59cb3e8906f
Author: John Muehlhausen <jm...@kershnertrading.com>
AuthorDate: Mon Jun 3 10:22:50 2019 -0500

    ARROW-5395: [C++] Utilize stream EOS in File format
    
    We currently do not write EOS at the end of a Message stream inside the File format.  As a result, the file cannot be parsed sequentially.  This change prepares for other implementations or future reference features that parse a File sequentially... i.e. without access to seek().
    
    Author: John Muehlhausen <jm...@kershnertrading.com>
    
    Closes #4372 from jgm-ktg/master and squashes the following commits:
    
    2ce546cc3 <John Muehlhausen> remove JS/Java for separate JIRA issues
    350177829 <John Muehlhausen> ARROW-5395: Python test case
    9e83053d0 <John Muehlhausen> incorporate first suggestions for writing EOS in File
    901dfeab0 <John Muehlhausen> ARROW-5395: Utilize stream EOS in File format
---
 cpp/src/arrow/ipc/writer.cc      |  7 ++++++-
 docs/source/format/IPC.rst       |  2 +-
 python/pyarrow/tests/test_ipc.py | 17 +++++++++++++++++
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 94edb4d..d7d129e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -999,6 +999,9 @@ class StreamBookKeeper {
   int64_t position_;
 };
 
+// End of stream marker
+constexpr int32_t kEos = 0;
+
 /// A IpcPayloadWriter implementation that writes to a IPC stream
 /// (with an end-of-stream marker)
 class PayloadStreamWriter : public internal::IpcPayloadWriter,
@@ -1022,7 +1025,6 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
 
   Status Close() override {
     // Write 0 EOS message
-    const int32_t kEos = 0;
     return Write(&kEos, sizeof(int32_t));
   }
 };
@@ -1076,6 +1078,9 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
   }
 
   Status Close() override {
+    // Write 0 EOS message for compatibility with sequential readers
+    RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+
     // Write file footer
     RETURN_NOT_OK(UpdatePosition());
     int64_t initial_position = position_;
diff --git a/docs/source/format/IPC.rst b/docs/source/format/IPC.rst
index 62a1237..16567a6 100644
--- a/docs/source/format/IPC.rst
+++ b/docs/source/format/IPC.rst
@@ -104,7 +104,7 @@ Schematically we have: ::
 
     <magic number "ARROW1">
     <empty padding bytes [to 8 byte boundary]>
-    <STREAMING FORMAT>
+    <STREAMING FORMAT with EOS>
     <FOOTER>
     <FOOTER SIZE: int32>
     <magic number "ARROW1">
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 3eb2cdc..92b1613 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -475,6 +475,23 @@ def test_socket_read_all(socket_fixture):
 # ----------------------------------------------------------------------
 # Miscellaneous IPC tests
 
+def test_ipc_file_stream_has_eos():
+    # ARROW-5395
+
+    df = pd.DataFrame({'foo': [1.5]})
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+    write_file(batch, sink)
+    buffer = sink.getvalue()
+
+    # skip the file magic
+    reader = pa.ipc.open_stream(buffer[8:])
+
+    # will fail if encounters footer data instead of eos
+    rdf = reader.read_pandas()
+
+    assert_frame_equal(df, rdf)
+
 
 def test_ipc_zero_copy_numpy():
     df = pd.DataFrame({'foo': [1.5]})