You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/25 19:49:07 UTC
arrow git commit: ARROW-498 [C++] Add command line utilities that
convert between stream and file.
Repository: arrow
Updated Branches:
refs/heads/master 61a54f8a6 -> a68af9d16
ARROW-498 [C++] Add command line utilities that convert between stream and file.
These are in the style of unix utilities using stdin/stdout for argument passing.
This makes it easy to chain them together and I think are using for getting started
or testing. As an example, this command line tests a round trip:
$ build/debug/file-to-stream /tmp/arrow-file | build/debug/stream-to-file > /tmp/copy
$ diff /tmp/arrow-file /tmp/copy
If we had the same in java, this would make it pretty convenient for integration
testing.
Author: Nong Li <no...@gmail.com>
Closes #302 from nongli/utils and squashes the following commits:
b970c75 [Nong Li] fix long -> int64_t
a01ef4d [Nong Li] Fix style issues.
da3d98d [Nong Li] ARROW-498 [C++] Add commandline utilities that convert between stream and file.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a68af9d1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a68af9d1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a68af9d1
Branch: refs/heads/master
Commit: a68af9d168e381d1730ae0cb4dc653bef42562d3
Parents: 61a54f8
Author: Nong Li <no...@gmail.com>
Authored: Wed Jan 25 14:49:00 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Jan 25 14:49:00 2017 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 4 ++
cpp/src/arrow/util/CMakeLists.txt | 26 +++++++++
cpp/src/arrow/util/file-to-stream.cc | 60 ++++++++++++++++++++
cpp/src/arrow/util/io-util.h | 93 +++++++++++++++++++++++++++++++
cpp/src/arrow/util/stream-to-file.cc | 58 +++++++++++++++++++
5 files changed, 241 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 9039ffb..a0f89f3 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -90,6 +90,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_ALTIVEC
"Build Arrow with Altivec"
ON)
+
+ option(ARROW_BUILD_UTILITIES
+ "Build Arrow commandline utilities"
+ ON)
endif()
if(NOT ARROW_BUILD_TESTS)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index 8d9afcc..0830ee2 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -68,4 +68,30 @@ if (ARROW_BUILD_BENCHMARKS)
endif()
endif()
+if (ARROW_BUILD_UTILITIES)
+ if (APPLE)
+ set(UTIL_LINK_LIBS
+ arrow_ipc_static
+ arrow_io_static
+ arrow_static
+ boost_filesystem_static
+ boost_system_static
+ dl)
+ else()
+ set(UTIL_LINK_LIBS
+ arrow_ipc_static
+ arrow_io_static
+ arrow_static
+ pthread
+ boost_filesystem_static
+ boost_system_static
+ dl)
+ endif()
+
+ add_executable(file-to-stream file-to-stream.cc)
+ target_link_libraries(file-to-stream ${UTIL_LINK_LIBS})
+ add_executable(stream-to-file stream-to-file.cc)
+ target_link_libraries(stream-to-file ${UTIL_LINK_LIBS})
+endif()
+
ADD_ARROW_TEST(bit-util-test)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/file-to-stream.cc b/cpp/src/arrow/util/file-to-stream.cc
new file mode 100644
index 0000000..42c1d55
--- /dev/null
+++ b/cpp/src/arrow/util/file-to-stream.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include "arrow/io/file.h"
+#include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
+#include "arrow/status.h"
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Reads a file on the file system and prints to stdout the stream version of it.
+Status ConvertToStream(const char* path) {
+ std::shared_ptr<io::ReadableFile> in_file;
+ std::shared_ptr<ipc::FileReader> reader;
+
+ RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
+ RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+
+ io::StdoutStream sink;
+ std::shared_ptr<ipc::StreamWriter> writer;
+ RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
+ for (int i = 0; i < reader->num_record_batches(); ++i) {
+ std::shared_ptr<RecordBatch> chunk;
+ RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+ RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
+ }
+ return writer->Close();
+}
+
+} // namespace arrow
+
+int main(int argc, char** argv) {
+ if (argc != 2) {
+ std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
+ return 1;
+ }
+ arrow::Status status = arrow::ConvertToStream(argv[1]);
+ if (!status.ok()) {
+ std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
+ return 1;
+ }
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/io-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
new file mode 100644
index 0000000..3e5054d
--- /dev/null
+++ b/cpp/src/arrow/util/io-util.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_IO_UTIL_H
+#define ARROW_UTIL_IO_UTIL_H
+
+#include <iostream>
+#include "arrow/buffer.h"
+
+namespace arrow {
+namespace io {
+
+// Output stream that just writes to stdout.
+class StdoutStream : public OutputStream {
+ public:
+ StdoutStream() : pos_(0) {
+ set_mode(FileMode::WRITE);
+ }
+ virtual ~StdoutStream() {}
+
+ Status Close() { return Status::OK(); }
+ Status Tell(int64_t* position) {
+ *position = pos_;
+ return Status::OK();
+ }
+
+ Status Write(const uint8_t* data, int64_t nbytes) {
+ pos_ += nbytes;
+ std::cout.write(reinterpret_cast<const char*>(data), nbytes);
+ return Status::OK();
+ }
+ private:
+ int64_t pos_;
+};
+
+// Input stream that just reads from stdin.
+class StdinStream : public InputStream {
+ public:
+ StdinStream() : pos_(0) {
+ set_mode(FileMode::READ);
+ }
+ virtual ~StdinStream() {}
+
+ Status Close() { return Status::OK(); }
+ Status Tell(int64_t* position) {
+ *position = pos_;
+ return Status::OK();
+ }
+
+ virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+ std::cin.read(reinterpret_cast<char*>(out), nbytes);
+ if (std::cin) {
+ *bytes_read = nbytes;
+ pos_ += nbytes;
+ } else {
+ *bytes_read = 0;
+ }
+ return Status::OK();
+ }
+
+ virtual Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ auto buffer = std::make_shared<PoolBuffer>(nullptr);
+ RETURN_NOT_OK(buffer->Resize(nbytes));
+ int64_t bytes_read;
+ RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
+ RETURN_NOT_OK(buffer->Resize(bytes_read, false));
+ *out = buffer;
+ return Status::OK();
+ }
+
+ private:
+ int64_t pos_;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_UTIL_IO_UTIL_H
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/stream-to-file.cc b/cpp/src/arrow/util/stream-to-file.cc
new file mode 100644
index 0000000..7a8ec0b
--- /dev/null
+++ b/cpp/src/arrow/util/stream-to-file.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include "arrow/io/file.h"
+#include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
+#include "arrow/status.h"
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Converts a stream from stdin to a file written to standard out.
+// A typical usage would be:
+// $ <program that produces streaming output> | stream-to-file > file.arrow
+Status ConvertToFile() {
+ std::shared_ptr<io::InputStream> input(new io::StdinStream);
+ std::shared_ptr<ipc::StreamReader> reader;
+ RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
+
+ io::StdoutStream sink;
+ std::shared_ptr<ipc::FileWriter> writer;
+ RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
+
+ std::shared_ptr<RecordBatch> batch;
+ while (true) {
+ RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
+ if (batch == nullptr) break;
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+ }
+ return writer->Close();
+}
+
+} // namespace arrow
+
+int main(int argc, char** argv) {
+ arrow::Status status = arrow::ConvertToFile();
+ if (!status.ok()) {
+ std::cerr << "Could not convert to file: " << status.ToString() << std::endl;
+ return 1;
+ }
+ return 0;
+}