You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/04/04 06:40:46 UTC

arrow git commit: ARROW-656: [C++] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency

Repository: arrow
Updated Branches:
  refs/heads/master d0cd03d78 -> d560e3077


ARROW-656: [C++] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency

Author: Wes McKinney <we...@twosigma.com>

Closes #486 from wesm/ARROW-656 and squashes the following commits:

be0d4bc [Wes McKinney] Fix glib after renaming class
042f533 [Wes McKinney] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d560e307
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d560e307
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d560e307

Branch: refs/heads/master
Commit: d560e307749a2397810962db1a5af4fb65675f17
Parents: d0cd03d
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Apr 4 08:40:40 2017 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Tue Apr 4 08:40:40 2017 +0200

----------------------------------------------------------------------
 c_glib/arrow-glib/io-memory-mapped-file.cpp |  2 +-
 c_glib/arrow-glib/io-writeable-file.cpp     |  2 +-
 c_glib/arrow-glib/io-writeable-file.h       |  2 +-
 c_glib/arrow-glib/io-writeable-file.hpp     |  8 ++---
 cpp/src/arrow/io/interfaces.h               |  6 ++--
 cpp/src/arrow/io/io-memory-test.cc          | 27 ++++++++++++++
 cpp/src/arrow/io/memory.cc                  | 45 ++++++++++++++++++++++++
 cpp/src/arrow/io/memory.h                   | 23 ++++++++++++
 python/pyarrow/includes/libarrow.pxd        |  4 +--
 9 files changed, 107 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-memory-mapped-file.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/io-memory-mapped-file.cpp b/c_glib/arrow-glib/io-memory-mapped-file.cpp
index 12c9a6c..e2e255c 100644
--- a/c_glib/arrow-glib/io-memory-mapped-file.cpp
+++ b/c_glib/arrow-glib/io-memory-mapped-file.cpp
@@ -127,7 +127,7 @@ garrow_io_writeable_interface_init(GArrowIOWriteableInterface *iface)
   iface->get_raw = garrow_io_memory_mapped_file_get_raw_writeable_interface;
 }
 
-static std::shared_ptr<arrow::io::WriteableFileInterface>
+static std::shared_ptr<arrow::io::WriteableFile>
 garrow_io_memory_mapped_file_get_raw_writeable_file_interface(GArrowIOWriteableFile *file)
 {
   auto memory_mapped_file = GARROW_IO_MEMORY_MAPPED_FILE(file);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/io-writeable-file.cpp b/c_glib/arrow-glib/io-writeable-file.cpp
index 3de42dd..41b682a 100644
--- a/c_glib/arrow-glib/io-writeable-file.cpp
+++ b/c_glib/arrow-glib/io-writeable-file.cpp
@@ -76,7 +76,7 @@ garrow_io_writeable_file_write_at(GArrowIOWriteableFile *writeable_file,
 
 G_END_DECLS
 
-std::shared_ptr<arrow::io::WriteableFileInterface>
+std::shared_ptr<arrow::io::WriteableFile>
 garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file)
 {
   auto *iface = GARROW_IO_WRITEABLE_FILE_GET_IFACE(writeable_file);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/io-writeable-file.h b/c_glib/arrow-glib/io-writeable-file.h
index 4a4dee5..d1ebdbe 100644
--- a/c_glib/arrow-glib/io-writeable-file.h
+++ b/c_glib/arrow-glib/io-writeable-file.h
@@ -28,7 +28,7 @@ G_BEGIN_DECLS
 #define GARROW_IO_WRITEABLE_FILE(obj)                           \
   (G_TYPE_CHECK_INSTANCE_CAST((obj),                            \
                               GARROW_IO_TYPE_WRITEABLE_FILE,    \
-                              GArrowIOWriteableFileInterface))
+                              GArrowIOWriteableFile))
 #define GARROW_IO_IS_WRITEABLE_FILE(obj)                        \
   (G_TYPE_CHECK_INSTANCE_TYPE((obj),                            \
                               GARROW_IO_TYPE_WRITEABLE_FILE))

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/io-writeable-file.hpp b/c_glib/arrow-glib/io-writeable-file.hpp
index 2043007..aba95b2 100644
--- a/c_glib/arrow-glib/io-writeable-file.hpp
+++ b/c_glib/arrow-glib/io-writeable-file.hpp
@@ -24,15 +24,15 @@
 #include <arrow-glib/io-writeable-file.h>
 
 /**
- * GArrowIOWriteableFileInterface:
+ * GArrowIOWriteableFile:
  *
- * It wraps `arrow::io::WriteableFileInterface`.
+ * It wraps `arrow::io::WriteableFile`.
  */
 struct _GArrowIOWriteableFileInterface
 {
   GTypeInterface parent_iface;
 
-  std::shared_ptr<arrow::io::WriteableFileInterface> (*get_raw)(GArrowIOWriteableFile *file);
+  std::shared_ptr<arrow::io::WriteableFile> (*get_raw)(GArrowIOWriteableFile *file);
 };
 
-std::shared_ptr<arrow::io::WriteableFileInterface> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file);
+std::shared_ptr<arrow::io::WriteableFile> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 258a315..b5a0bd8 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -121,16 +121,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
   RandomAccessFile();
 };
 
-class ARROW_EXPORT WriteableFileInterface : public OutputStream, public Seekable {
+class ARROW_EXPORT WriteableFile : public OutputStream, public Seekable {
  public:
   virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0;
 
  protected:
-  WriteableFileInterface() { set_mode(FileMode::READ); }
+  WriteableFile() { set_mode(FileMode::READ); }
 };
 
 class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile,
-                                            public WriteableFileInterface {
+                                            public WriteableFile {
  protected:
   ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); }
 };

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index 442cd0c..4704fe8 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -66,6 +66,33 @@ TEST_F(TestBufferOutputStream, CloseResizes) {
   ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
 }
 
+TEST(TestFixedSizeBufferWriter, Basics) {
+  std::shared_ptr<MutableBuffer> buffer;
+  ASSERT_OK(AllocateBuffer(default_memory_pool(), 1024, &buffer));
+
+  FixedSizeBufferWriter writer(buffer);
+
+  int64_t position;
+  ASSERT_OK(writer.Tell(&position));
+  ASSERT_EQ(0, position);
+
+  std::string data = "data123456";
+  auto nbytes = static_cast<int64_t>(data.size());
+  ASSERT_OK(writer.Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));
+
+  ASSERT_OK(writer.Tell(&position));
+  ASSERT_EQ(nbytes, position);
+
+  ASSERT_OK(writer.Seek(4));
+  ASSERT_OK(writer.Tell(&position));
+  ASSERT_EQ(4, position);
+
+  ASSERT_RAISES(IOError, writer.Seek(-1));
+  ASSERT_RAISES(IOError, writer.Seek(1024));
+
+  ASSERT_OK(writer.Close());
+}
+
 TEST(TestBufferReader, RetainParentReference) {
   // ARROW-387
   std::string data = "data123456";

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 5b5c864..2e701e1 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -99,6 +99,51 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 }
 
 // ----------------------------------------------------------------------
+// In-memory buffer writer
+
+/// Input buffer must be mutable, will abort if not
+FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) {
+  buffer_ = buffer;
+  DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
+  mutable_data_ = buffer->mutable_data();
+  size_ = buffer->size();
+  position_ = 0;
+}
+
+FixedSizeBufferWriter::~FixedSizeBufferWriter() {}
+
+Status FixedSizeBufferWriter::Close() {
+  // No-op
+  return Status::OK();
+}
+
+Status FixedSizeBufferWriter::Seek(int64_t position) {
+  if (position < 0 || position >= size_) {
+    return Status::IOError("position out of bounds");
+  }
+  position_ = position;
+  return Status::OK();
+}
+
+Status FixedSizeBufferWriter::Tell(int64_t* position) {
+  *position = position_;
+  return Status::OK();
+}
+
+Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
+  std::memcpy(mutable_data_ + position_, data, nbytes);
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status FixedSizeBufferWriter::WriteAt(
+    int64_t position, const uint8_t* data, int64_t nbytes) {
+  std::lock_guard<std::mutex> guard(lock_);
+  RETURN_NOT_OK(Seek(position));
+  return Write(data, nbytes);
+}
+
+// ----------------------------------------------------------------------
 // In-memory buffer reader
 
 BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index eb2a509..fbb186b 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -22,6 +22,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 
 #include "arrow/io/interfaces.h"
@@ -66,6 +67,28 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
   uint8_t* mutable_data_;
 };
 
+/// \brief Enables random writes into a fixed-size mutable buffer
+///
+class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
+ public:
+  /// Input buffer must be mutable, will abort if not
+  explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
+  ~FixedSizeBufferWriter();
+
+  Status Close() override;
+  Status Seek(int64_t position) override;
+  Status Tell(int64_t* position) override;
+  Status Write(const uint8_t* data, int64_t nbytes) override;
+  Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+ private:
+  std::mutex lock_;
+  std::shared_ptr<Buffer> buffer_;
+  uint8_t* mutable_data_;
+  int64_t size_;
+  int64_t position_;
+};
+
 class ARROW_EXPORT BufferReader : public RandomAccessFile {
  public:
   explicit BufferReader(const std::shared_ptr<Buffer>& buffer);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 67d6af9..2a0488f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -342,12 +342,12 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
         CStatus ReadAt(int64_t position, int64_t nbytes,
                        int64_t* bytes_read, shared_ptr[CBuffer]* out)
 
-    cdef cppclass WriteableFileInterface(OutputStream, Seekable):
+    cdef cppclass WriteableFile(OutputStream, Seekable):
         CStatus WriteAt(int64_t position, const uint8_t* data,
                         int64_t nbytes)
 
     cdef cppclass ReadWriteFileInterface(RandomAccessFile,
-                                         WriteableFileInterface):
+                                         WriteableFile):
         pass