You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/25 19:49:07 UTC

arrow git commit: ARROW-498 [C++] Add command line utilities that convert between stream and file.

Repository: arrow
Updated Branches:
  refs/heads/master 61a54f8a6 -> a68af9d16


ARROW-498 [C++] Add command line utilities that convert between stream and file.

These are in the style of unix utilities using stdin/stdout for argument passing.
This makes it easy to chain them together and I think are using for getting started
or testing. As an example, this command line tests a round trip:
 $ build/debug/file-to-stream /tmp/arrow-file | build/debug/stream-to-file > /tmp/copy
 $ diff /tmp/arrow-file /tmp/copy

If we had the same in java, this would make it pretty convenient for integration
testing.

Author: Nong Li <no...@gmail.com>

Closes #302 from nongli/utils and squashes the following commits:

b970c75 [Nong Li] fix long -> int64_t
a01ef4d [Nong Li] Fix style issues.
da3d98d [Nong Li] ARROW-498 [C++] Add commandline utilities that convert between stream and file.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a68af9d1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a68af9d1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a68af9d1

Branch: refs/heads/master
Commit: a68af9d168e381d1730ae0cb4dc653bef42562d3
Parents: 61a54f8
Author: Nong Li <no...@gmail.com>
Authored: Wed Jan 25 14:49:00 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Jan 25 14:49:00 2017 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                   |  4 ++
 cpp/src/arrow/util/CMakeLists.txt    | 26 +++++++++
 cpp/src/arrow/util/file-to-stream.cc | 60 ++++++++++++++++++++
 cpp/src/arrow/util/io-util.h         | 93 +++++++++++++++++++++++++++++++
 cpp/src/arrow/util/stream-to-file.cc | 58 +++++++++++++++++++
 5 files changed, 241 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 9039ffb..a0f89f3 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -90,6 +90,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(ARROW_ALTIVEC
     "Build Arrow with Altivec"
     ON)
+
+  option(ARROW_BUILD_UTILITIES
+    "Build Arrow commandline utilities"
+    ON)
 endif()
 
 if(NOT ARROW_BUILD_TESTS)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index 8d9afcc..0830ee2 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -68,4 +68,30 @@ if (ARROW_BUILD_BENCHMARKS)
   endif()
 endif()
 
+if (ARROW_BUILD_UTILITIES)
+  if (APPLE)
+    set(UTIL_LINK_LIBS
+      arrow_ipc_static
+      arrow_io_static
+      arrow_static
+      boost_filesystem_static
+      boost_system_static
+      dl)
+  else()
+    set(UTIL_LINK_LIBS
+      arrow_ipc_static
+      arrow_io_static
+      arrow_static
+      pthread
+      boost_filesystem_static
+      boost_system_static
+      dl)
+  endif()
+
+  add_executable(file-to-stream file-to-stream.cc)
+  target_link_libraries(file-to-stream ${UTIL_LINK_LIBS})
+  add_executable(stream-to-file stream-to-file.cc)
+  target_link_libraries(stream-to-file ${UTIL_LINK_LIBS})
+endif()
+
 ADD_ARROW_TEST(bit-util-test)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/file-to-stream.cc b/cpp/src/arrow/util/file-to-stream.cc
new file mode 100644
index 0000000..42c1d55
--- /dev/null
+++ b/cpp/src/arrow/util/file-to-stream.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include "arrow/io/file.h"
+#include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
+#include "arrow/status.h"
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Reads a file on the file system and prints to stdout the stream version of it.
+Status ConvertToStream(const char* path) {
+  std::shared_ptr<io::ReadableFile> in_file;
+  std::shared_ptr<ipc::FileReader> reader;
+
+  RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
+  RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+
+  io::StdoutStream sink;
+  std::shared_ptr<ipc::StreamWriter> writer;
+  RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
+  for (int i = 0; i < reader->num_record_batches(); ++i) {
+    std::shared_ptr<RecordBatch> chunk;
+    RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+    RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
+  }
+  return writer->Close();
+}
+
+} // namespace arrow
+
+int main(int argc, char** argv) {
+  if (argc != 2) {
+    std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
+    return 1;
+  }
+  arrow::Status status = arrow::ConvertToStream(argv[1]);
+  if (!status.ok()) {
+    std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
+    return 1;
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/io-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
new file mode 100644
index 0000000..3e5054d
--- /dev/null
+++ b/cpp/src/arrow/util/io-util.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_IO_UTIL_H
+#define ARROW_UTIL_IO_UTIL_H
+
+#include <iostream>
+#include "arrow/buffer.h"
+
+namespace arrow {
+namespace io {
+
+// Output stream that just writes to stdout.
+class StdoutStream : public OutputStream {
+ public:
+  StdoutStream() : pos_(0) {
+    set_mode(FileMode::WRITE);
+  }
+  virtual ~StdoutStream() {}
+
+  Status Close() { return Status::OK(); }
+  Status Tell(int64_t* position)  {
+    *position = pos_;
+    return Status::OK();
+  }
+
+  Status Write(const uint8_t* data, int64_t nbytes) {
+    pos_ += nbytes;
+    std::cout.write(reinterpret_cast<const char*>(data), nbytes);
+    return Status::OK();
+  }
+ private:
+  int64_t pos_;
+};
+
+// Input stream that just reads from stdin.
+class StdinStream : public InputStream {
+ public:
+  StdinStream() : pos_(0) {
+    set_mode(FileMode::READ);
+  }
+  virtual ~StdinStream() {}
+
+  Status Close() { return Status::OK(); }
+  Status Tell(int64_t* position)  {
+    *position = pos_;
+    return Status::OK();
+  }
+
+  virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+    std::cin.read(reinterpret_cast<char*>(out), nbytes);
+    if (std::cin) {
+      *bytes_read = nbytes;
+      pos_ += nbytes;
+    } else {
+      *bytes_read = 0;
+    }
+    return Status::OK();
+  }
+
+  virtual Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    auto buffer = std::make_shared<PoolBuffer>(nullptr);
+    RETURN_NOT_OK(buffer->Resize(nbytes));
+    int64_t bytes_read;
+    RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
+    RETURN_NOT_OK(buffer->Resize(bytes_read, false));
+    *out = buffer;
+    return Status::OK();
+  }
+
+ private:
+  int64_t pos_;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_UTIL_IO_UTIL_H
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/a68af9d1/cpp/src/arrow/util/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/stream-to-file.cc b/cpp/src/arrow/util/stream-to-file.cc
new file mode 100644
index 0000000..7a8ec0b
--- /dev/null
+++ b/cpp/src/arrow/util/stream-to-file.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include "arrow/io/file.h"
+#include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
+#include "arrow/status.h"
+
+#include "arrow/util/io-util.h"
+
+namespace arrow {
+
+// Converts a stream from stdin to a file written to standard out.
+// A typical usage would be:
+// $ <program that produces streaming output> | stream-to-file > file.arrow
+Status ConvertToFile() {
+  std::shared_ptr<io::InputStream> input(new io::StdinStream);
+  std::shared_ptr<ipc::StreamReader> reader;
+  RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
+
+  io::StdoutStream sink;
+  std::shared_ptr<ipc::FileWriter> writer;
+  RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
+
+  std::shared_ptr<RecordBatch> batch;
+  while (true) {
+    RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
+    if (batch == nullptr) break;
+    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+  }
+  return writer->Close();
+}
+
+} // namespace arrow
+
+int main(int argc, char** argv) {
+  arrow::Status status = arrow::ConvertToFile();
+  if (!status.ok()) {
+    std::cerr << "Could not convert to file: " << status.ToString() << std::endl;
+    return 1;
+  }
+  return 0;
+}