You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/11/16 20:30:35 UTC

[arrow] branch master updated: ARROW-3781: [C++] Implement BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ffda1  ARROW-3781: [C++] Implement BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool
69ffda1 is described below

commit 69ffda1f7312a8d63195a5df8775a68ff5e0fe5f
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Nov 16 15:30:18 2018 -0500

    ARROW-3781: [C++] Implement BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool
    
    Since the `BufferedOutputStream` ctor was using `std::string` to allocate a buffer internally, I needed to use a static ctor that returns Status for creating the output stream with an arbitrary buffer size.
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #2976 from wesm/ARROW-3782 and squashes the following commits:
    
    bfff3b5ba <Wes McKinney> Use constexpr for buffer sizes in io-file-benchmark
    c788e2dca <Wes McKinney> SetBufferSize is now threadsafe
    bffe5dc02 <Wes McKinney> Add dcheck for buffer size. Pass shared_ptr by value instead of const-ref
    d4f4cd58c <Wes McKinney> Implement BufferedOutputStream::SetBufferSize. Use static constructor and allocate buffer from MemoryPool
---
 cpp/src/arrow/io/buffered.cc          | 60 ++++++++++++++++++++++++++++-------
 cpp/src/arrow/io/buffered.h           | 18 ++++++++++-
 cpp/src/arrow/io/io-buffered-test.cc  | 32 ++++++++++++++++---
 cpp/src/arrow/io/io-file-benchmark.cc | 22 ++++++++-----
 4 files changed, 107 insertions(+), 25 deletions(-)

diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index 47f7d02..2493080 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -16,8 +16,6 @@
 // under the License.
 
 #include "arrow/io/buffered.h"
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
 
 #include <cstring>
 #include <memory>
@@ -25,6 +23,10 @@
 #include <string>
 #include <utility>
 
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
 namespace arrow {
 namespace io {
 
@@ -34,11 +36,10 @@ namespace io {
 class BufferedOutputStream::Impl {
  public:
   explicit Impl(std::shared_ptr<OutputStream> raw)
-      : raw_(raw),
+      : raw_(std::move(raw)),
         is_open_(true),
-        buffer_(std::string(BUFFER_SIZE, '\0')),
-        buffer_data_(const_cast<char*>(buffer_.data())),
         buffer_pos_(0),
+        buffer_size_(0),
         raw_pos_(-1) {}
 
   ~Impl() { DCHECK(Close().ok()); }
@@ -77,15 +78,15 @@ class BufferedOutputStream::Impl {
     if (nbytes == 0) {
       return Status::OK();
     }
-    if (nbytes + buffer_pos_ >= BUFFER_SIZE) {
+    if (nbytes + buffer_pos_ >= buffer_size_) {
       RETURN_NOT_OK(FlushUnlocked());
       DCHECK_EQ(buffer_pos_, 0);
-      if (nbytes >= BUFFER_SIZE) {
+      if (nbytes >= buffer_size_) {
         // Direct write
         return raw_->Write(data, nbytes);
       }
     }
-    DCHECK_LE(buffer_pos_ + nbytes, BUFFER_SIZE);
+    DCHECK_LE(buffer_pos_ + nbytes, buffer_size_);
     std::memcpy(buffer_data_ + buffer_pos_, data, nbytes);
     buffer_pos_ += nbytes;
     return Status::OK();
@@ -108,15 +109,34 @@ class BufferedOutputStream::Impl {
 
   std::shared_ptr<OutputStream> raw() const { return raw_; }
 
- private:
-  // This size chosen so that memcpy() remains cheap
-  static const int64_t BUFFER_SIZE = 4096;
+  Status SetBufferSize(int64_t new_buffer_size) {
+    std::lock_guard<std::mutex> guard(lock_);
+    DCHECK_GT(new_buffer_size, 0);
+    if (!buffer_) {
+      RETURN_NOT_OK(AllocateResizableBuffer(new_buffer_size, &buffer_));
+    } else {
+      if (buffer_pos_ >= new_buffer_size) {
+        // If the buffer is shrinking, first flush to the raw OutputStream
+        RETURN_NOT_OK(FlushUnlocked());
+      }
+      RETURN_NOT_OK(buffer_->Resize(new_buffer_size));
+    }
+    buffer_data_ = reinterpret_cast<char*>(buffer_->mutable_data());
+    buffer_pos_ = 0;
+    buffer_size_ = new_buffer_size;
+    return Status::OK();
+  }
+
+  int64_t buffer_size() const { return buffer_size_; }
 
+ private:
   std::shared_ptr<OutputStream> raw_;
   bool is_open_;
-  std::string buffer_;
+
+  std::shared_ptr<ResizableBuffer> buffer_;
   char* buffer_data_;
   int64_t buffer_pos_;
+  int64_t buffer_size_;
   mutable int64_t raw_pos_;
   mutable std::mutex lock_;
 };
@@ -124,8 +144,24 @@ class BufferedOutputStream::Impl {
 BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw)
     : impl_(new BufferedOutputStream::Impl(std::move(raw))) {}
 
+Status BufferedOutputStream::Create(std::shared_ptr<OutputStream> raw,
+                                    int64_t buffer_size,
+                                    std::shared_ptr<BufferedOutputStream>* out) {
+  auto result =
+      std::shared_ptr<BufferedOutputStream>(new BufferedOutputStream(std::move(raw)));
+  RETURN_NOT_OK(result->SetBufferSize(buffer_size));
+  *out = std::move(result);
+  return Status::OK();
+}
+
 BufferedOutputStream::~BufferedOutputStream() {}
 
+Status BufferedOutputStream::SetBufferSize(int64_t new_buffer_size) {
+  return impl_->SetBufferSize(new_buffer_size);
+}
+
+int64_t BufferedOutputStream::buffer_size() const { return impl_->buffer_size(); }
+
 Status BufferedOutputStream::Close() { return impl_->Close(); }
 
 bool BufferedOutputStream::closed() const { return impl_->closed(); }
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
index 19aab89..e028607 100644
--- a/cpp/src/arrow/io/buffered.h
+++ b/cpp/src/arrow/io/buffered.h
@@ -37,7 +37,21 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
   ~BufferedOutputStream() override;
 
   /// \brief Create a buffered output stream wrapping the given output stream.
-  explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+  /// \param[in] raw another OutputStream
+  /// \param[in] buffer_size the size of the temporary buffer. Allocates from
+  /// the default memory pool
+  /// \param[out] out the created BufferedOutputStream
+  /// \return Status
+  static Status Create(std::shared_ptr<OutputStream> raw, int64_t buffer_size,
+                       std::shared_ptr<BufferedOutputStream>* out);
+
+  /// \brief Resize internal buffer
+  /// \param[in] new_buffer_size the new buffer size
+  /// \return Status
+  Status SetBufferSize(int64_t new_buffer_size);
+
+  /// \brief Return the current size of the internal buffer
+  int64_t buffer_size() const;
 
   // OutputStream interface
 
@@ -56,6 +70,8 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
   std::shared_ptr<OutputStream> raw() const;
 
  private:
+  explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+
   class ARROW_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
 };
diff --git a/cpp/src/arrow/io/io-buffered-test.cc b/cpp/src/arrow/io/io-buffered-test.cc
index 40d1b1c..8146d1a 100644
--- a/cpp/src/arrow/io/io-buffered-test.cc
+++ b/cpp/src/arrow/io/io-buffered-test.cc
@@ -76,9 +76,11 @@ class FileTestFixture : public ::testing::Test {
 // ----------------------------------------------------------------------
 // File output tests
 
+constexpr int64_t kDefaultBufferSize = 4096;
+
 class TestBufferedOutputStream : public FileTestFixture {
  public:
-  void OpenBuffered(bool append = false) {
+  void OpenBuffered(int64_t buffer_size = kDefaultBufferSize, bool append = false) {
     stream_.reset();
     std::shared_ptr<FileOutputStream> file;
     ASSERT_OK(FileOutputStream::Open(path_, append, &file));
@@ -91,7 +93,7 @@ class TestBufferedOutputStream : public FileTestFixture {
       lseek(fd_, 0, SEEK_END);
 #endif
     }
-    stream_ = std::make_shared<BufferedOutputStream>(std::move(file));
+    ASSERT_OK(BufferedOutputStream::Create(file, buffer_size, &stream_));
   }
 
   void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) {
@@ -123,7 +125,7 @@ class TestBufferedOutputStream : public FileTestFixture {
 
  protected:
   int fd_;
-  std::shared_ptr<OutputStream> stream_;
+  std::shared_ptr<BufferedOutputStream> stream_;
 };
 
 TEST_F(TestBufferedOutputStream, DestructorClosesFile) {
@@ -217,6 +219,28 @@ TEST_F(TestBufferedOutputStream, Flush) {
   ASSERT_OK(stream_->Close());
 }
 
+TEST_F(TestBufferedOutputStream, SetBufferSize) {
+  OpenBuffered(20);
+
+  ASSERT_EQ(20, stream_->buffer_size());
+
+  const std::string datastr = "1234568790abcdefghij";
+  const char* data = datastr.data();
+
+  // Write part of the data, then shrink buffer size to make sure it gets
+  // flushed
+  ASSERT_OK(stream_->Write(data, 10));
+  ASSERT_OK(stream_->SetBufferSize(10));
+
+  ASSERT_EQ(10, stream_->buffer_size());
+
+  ASSERT_OK(stream_->Write(data + 10, 10));
+  ASSERT_OK(stream_->Flush());
+
+  AssertFileContents(path_, datastr);
+  ASSERT_OK(stream_->Close());
+}
+
 TEST_F(TestBufferedOutputStream, Tell) {
   OpenBuffered();
 
@@ -229,7 +253,7 @@ TEST_F(TestBufferedOutputStream, Tell) {
 
   ASSERT_OK(stream_->Close());
 
-  OpenBuffered(true /* append */);
+  OpenBuffered(kDefaultBufferSize, true /* append */);
   AssertTell(100100);
   WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
   AssertTell(100190);
diff --git a/cpp/src/arrow/io/io-file-benchmark.cc b/cpp/src/arrow/io/io-file-benchmark.cc
index e5a326e..c57fa6d 100644
--- a/cpp/src/arrow/io/io-file-benchmark.cc
+++ b/cpp/src/arrow/io/io-file-benchmark.cc
@@ -41,6 +41,8 @@ std::string GetNullFile() { return "/dev/null"; }
 const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
 const std::valarray<int64_t> large_sizes = {8192, 100000};
 
+constexpr int64_t kBufferSize = 4096;
+
 class BackgroundReader {
   // A class that reads data in the background from a file descriptor
 
@@ -157,11 +159,12 @@ static void BM_FileOutputStreamSmallWritesToNull(
 
 static void BM_BufferedOutputStreamSmallWritesToNull(
     benchmark::State& state) {  // NOLINT non-const reference
-  std::shared_ptr<io::OutputStream> stream;
-  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
-  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+  std::shared_ptr<io::OutputStream> file;
+  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &file));
 
-  BenchmarkStreamingWrites(state, small_sizes, stream.get());
+  std::shared_ptr<io::BufferedOutputStream> buffered_file;
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(file, kBufferSize, &buffered_file));
+  BenchmarkStreamingWrites(state, small_sizes, buffered_file.get());
 }
 
 // Benchmark writing a pipe
@@ -191,9 +194,10 @@ static void BM_BufferedOutputStreamSmallWritesToPipe(
   std::shared_ptr<io::OutputStream> stream;
   std::shared_ptr<BackgroundReader> reader;
   SetupPipeWriter(&stream, &reader);
-  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
 
-  BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
+  std::shared_ptr<io::BufferedOutputStream> buffered_stream;
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream));
+  BenchmarkStreamingWrites(state, small_sizes, buffered_stream.get(), reader.get());
 }
 
 static void BM_BufferedOutputStreamLargeWritesToPipe(
@@ -201,9 +205,11 @@ static void BM_BufferedOutputStreamLargeWritesToPipe(
   std::shared_ptr<io::OutputStream> stream;
   std::shared_ptr<BackgroundReader> reader;
   SetupPipeWriter(&stream, &reader);
-  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
 
-  BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
+  std::shared_ptr<io::BufferedOutputStream> buffered_stream;
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream));
+
+  BenchmarkStreamingWrites(state, large_sizes, buffered_stream.get(), reader.get());
 }
 
 // We use real time as we don't want to count CPU time spent in the