You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/03 15:22:58 UTC
[arrow] branch master updated: ARROW-5395: [C++] Utilize stream EOS
in File format
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7bd9ad2 ARROW-5395: [C++] Utilize stream EOS in File format
7bd9ad2 is described below
commit 7bd9ad236ddef9c66b4f8e0911dea59cb3e8906f
Author: John Muehlhausen <jm...@kershnertrading.com>
AuthorDate: Mon Jun 3 10:22:50 2019 -0500
ARROW-5395: [C++] Utilize stream EOS in File format
We currently do not write EOS at the end of a Message stream inside the File format. As a result, the file cannot be parsed sequentially. This change prepares for other implementations or future reference features that parse a File sequentially... i.e. without access to seek().
Author: John Muehlhausen <jm...@kershnertrading.com>
Closes #4372 from jgm-ktg/master and squashes the following commits:
2ce546cc3 <John Muehlhausen> remove JS/Java for separate JIRA issues
350177829 <John Muehlhausen> ARROW-5395: Python test case
9e83053d0 <John Muehlhausen> incorporate first suggestions for writing EOS in File
901dfeab0 <John Muehlhausen> ARROW-5395: Utilize stream EOS in File format
---
cpp/src/arrow/ipc/writer.cc | 7 ++++++-
docs/source/format/IPC.rst | 2 +-
python/pyarrow/tests/test_ipc.py | 17 +++++++++++++++++
3 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 94edb4d..d7d129e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -999,6 +999,9 @@ class StreamBookKeeper {
int64_t position_;
};
+// End of stream marker
+constexpr int32_t kEos = 0;
+
/// A IpcPayloadWriter implementation that writes to a IPC stream
/// (with an end-of-stream marker)
class PayloadStreamWriter : public internal::IpcPayloadWriter,
@@ -1022,7 +1025,6 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
Status Close() override {
// Write 0 EOS message
- const int32_t kEos = 0;
return Write(&kEos, sizeof(int32_t));
}
};
@@ -1076,6 +1078,9 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
}
Status Close() override {
+ // Write 0 EOS message for compatibility with sequential readers
+ RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+
// Write file footer
RETURN_NOT_OK(UpdatePosition());
int64_t initial_position = position_;
diff --git a/docs/source/format/IPC.rst b/docs/source/format/IPC.rst
index 62a1237..16567a6 100644
--- a/docs/source/format/IPC.rst
+++ b/docs/source/format/IPC.rst
@@ -104,7 +104,7 @@ Schematically we have: ::
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
- <STREAMING FORMAT>
+ <STREAMING FORMAT with EOS>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 3eb2cdc..92b1613 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -475,6 +475,23 @@ def test_socket_read_all(socket_fixture):
# ----------------------------------------------------------------------
# Miscellaneous IPC tests
+def test_ipc_file_stream_has_eos():
+ # ARROW-5395
+
+ df = pd.DataFrame({'foo': [1.5]})
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.BufferOutputStream()
+ write_file(batch, sink)
+ buffer = sink.getvalue()
+
+ # skip the file magic
+ reader = pa.ipc.open_stream(buffer[8:])
+
+ # will fail if encounters footer data instead of eos
+ rdf = reader.read_pandas()
+
+ assert_frame_equal(df, rdf)
+
def test_ipc_zero_copy_numpy():
df = pd.DataFrame({'foo': [1.5]})