You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/11/16 20:30:35 UTC
[arrow] branch master updated: ARROW-3781: [C++] Implement
BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 69ffda1 ARROW-3781: [C++] Implement BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool
69ffda1 is described below
commit 69ffda1f7312a8d63195a5df8775a68ff5e0fe5f
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Nov 16 15:30:18 2018 -0500
ARROW-3781: [C++] Implement BufferedOutputStream::SetBufferSize. Allocate buffer from MemoryPool
Since the `BufferedOutputStream` ctor was using `std::string` to allocate a buffer internally, I needed to use a static ctor that returns Status for creating the output stream with an arbitrary buffer size.
Author: Wes McKinney <we...@apache.org>
Closes #2976 from wesm/ARROW-3782 and squashes the following commits:
bfff3b5ba <Wes McKinney> Use constexpr for buffer sizes in io-file-benchmark
c788e2dca <Wes McKinney> SetBufferSize is now threadsafe
bffe5dc02 <Wes McKinney> Add dcheck for buffer size. Pass shared_ptr by value instead of const-ref
d4f4cd58c <Wes McKinney> Implement BufferedOutputStream::SetBufferSize. Use static constructor and allocate buffer from MemoryPool
---
cpp/src/arrow/io/buffered.cc | 60 ++++++++++++++++++++++++++++-------
cpp/src/arrow/io/buffered.h | 18 ++++++++++-
cpp/src/arrow/io/io-buffered-test.cc | 32 ++++++++++++++++---
cpp/src/arrow/io/io-file-benchmark.cc | 22 ++++++++-----
4 files changed, 107 insertions(+), 25 deletions(-)
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index 47f7d02..2493080 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -16,8 +16,6 @@
// under the License.
#include "arrow/io/buffered.h"
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
#include <cstring>
#include <memory>
@@ -25,6 +23,10 @@
#include <string>
#include <utility>
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
namespace arrow {
namespace io {
@@ -34,11 +36,10 @@ namespace io {
class BufferedOutputStream::Impl {
public:
explicit Impl(std::shared_ptr<OutputStream> raw)
- : raw_(raw),
+ : raw_(std::move(raw)),
is_open_(true),
- buffer_(std::string(BUFFER_SIZE, '\0')),
- buffer_data_(const_cast<char*>(buffer_.data())),
buffer_pos_(0),
+ buffer_size_(0),
raw_pos_(-1) {}
~Impl() { DCHECK(Close().ok()); }
@@ -77,15 +78,15 @@ class BufferedOutputStream::Impl {
if (nbytes == 0) {
return Status::OK();
}
- if (nbytes + buffer_pos_ >= BUFFER_SIZE) {
+ if (nbytes + buffer_pos_ >= buffer_size_) {
RETURN_NOT_OK(FlushUnlocked());
DCHECK_EQ(buffer_pos_, 0);
- if (nbytes >= BUFFER_SIZE) {
+ if (nbytes >= buffer_size_) {
// Direct write
return raw_->Write(data, nbytes);
}
}
- DCHECK_LE(buffer_pos_ + nbytes, BUFFER_SIZE);
+ DCHECK_LE(buffer_pos_ + nbytes, buffer_size_);
std::memcpy(buffer_data_ + buffer_pos_, data, nbytes);
buffer_pos_ += nbytes;
return Status::OK();
@@ -108,15 +109,34 @@ class BufferedOutputStream::Impl {
std::shared_ptr<OutputStream> raw() const { return raw_; }
- private:
- // This size chosen so that memcpy() remains cheap
- static const int64_t BUFFER_SIZE = 4096;
+ Status SetBufferSize(int64_t new_buffer_size) {
+ std::lock_guard<std::mutex> guard(lock_);
+ DCHECK_GT(new_buffer_size, 0);
+ if (!buffer_) {
+ RETURN_NOT_OK(AllocateResizableBuffer(new_buffer_size, &buffer_));
+ } else {
+ if (buffer_pos_ >= new_buffer_size) {
+ // If the buffer is shrinking, first flush to the raw OutputStream
+ RETURN_NOT_OK(FlushUnlocked());
+ }
+ RETURN_NOT_OK(buffer_->Resize(new_buffer_size));
+ }
+ buffer_data_ = reinterpret_cast<char*>(buffer_->mutable_data());
+ buffer_pos_ = 0;
+ buffer_size_ = new_buffer_size;
+ return Status::OK();
+ }
+
+ int64_t buffer_size() const { return buffer_size_; }
+ private:
std::shared_ptr<OutputStream> raw_;
bool is_open_;
- std::string buffer_;
+
+ std::shared_ptr<ResizableBuffer> buffer_;
char* buffer_data_;
int64_t buffer_pos_;
+ int64_t buffer_size_;
mutable int64_t raw_pos_;
mutable std::mutex lock_;
};
@@ -124,8 +144,24 @@ class BufferedOutputStream::Impl {
BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw)
: impl_(new BufferedOutputStream::Impl(std::move(raw))) {}
+Status BufferedOutputStream::Create(std::shared_ptr<OutputStream> raw,
+ int64_t buffer_size,
+ std::shared_ptr<BufferedOutputStream>* out) {
+ auto result =
+ std::shared_ptr<BufferedOutputStream>(new BufferedOutputStream(std::move(raw)));
+ RETURN_NOT_OK(result->SetBufferSize(buffer_size));
+ *out = std::move(result);
+ return Status::OK();
+}
+
BufferedOutputStream::~BufferedOutputStream() {}
+Status BufferedOutputStream::SetBufferSize(int64_t new_buffer_size) {
+ return impl_->SetBufferSize(new_buffer_size);
+}
+
+int64_t BufferedOutputStream::buffer_size() const { return impl_->buffer_size(); }
+
Status BufferedOutputStream::Close() { return impl_->Close(); }
bool BufferedOutputStream::closed() const { return impl_->closed(); }
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
index 19aab89..e028607 100644
--- a/cpp/src/arrow/io/buffered.h
+++ b/cpp/src/arrow/io/buffered.h
@@ -37,7 +37,21 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
~BufferedOutputStream() override;
/// \brief Create a buffered output stream wrapping the given output stream.
- explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+ /// \param[in] raw another OutputStream
+ /// \param[in] buffer_size the size of the temporary buffer. Allocates from
+ /// the default memory pool
+ /// \param[out] out the created BufferedOutputStream
+ /// \return Status
+ static Status Create(std::shared_ptr<OutputStream> raw, int64_t buffer_size,
+ std::shared_ptr<BufferedOutputStream>* out);
+
+ /// \brief Resize internal buffer
+ /// \param[in] new_buffer_size the new buffer size
+ /// \return Status
+ Status SetBufferSize(int64_t new_buffer_size);
+
+ /// \brief Return the current size of the internal buffer
+ int64_t buffer_size() const;
// OutputStream interface
@@ -56,6 +70,8 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream {
std::shared_ptr<OutputStream> raw() const;
private:
+ explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+
class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};
diff --git a/cpp/src/arrow/io/io-buffered-test.cc b/cpp/src/arrow/io/io-buffered-test.cc
index 40d1b1c..8146d1a 100644
--- a/cpp/src/arrow/io/io-buffered-test.cc
+++ b/cpp/src/arrow/io/io-buffered-test.cc
@@ -76,9 +76,11 @@ class FileTestFixture : public ::testing::Test {
// ----------------------------------------------------------------------
// File output tests
+constexpr int64_t kDefaultBufferSize = 4096;
+
class TestBufferedOutputStream : public FileTestFixture {
public:
- void OpenBuffered(bool append = false) {
+ void OpenBuffered(int64_t buffer_size = kDefaultBufferSize, bool append = false) {
stream_.reset();
std::shared_ptr<FileOutputStream> file;
ASSERT_OK(FileOutputStream::Open(path_, append, &file));
@@ -91,7 +93,7 @@ class TestBufferedOutputStream : public FileTestFixture {
lseek(fd_, 0, SEEK_END);
#endif
}
- stream_ = std::make_shared<BufferedOutputStream>(std::move(file));
+ ASSERT_OK(BufferedOutputStream::Create(file, buffer_size, &stream_));
}
void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) {
@@ -123,7 +125,7 @@ class TestBufferedOutputStream : public FileTestFixture {
protected:
int fd_;
- std::shared_ptr<OutputStream> stream_;
+ std::shared_ptr<BufferedOutputStream> stream_;
};
TEST_F(TestBufferedOutputStream, DestructorClosesFile) {
@@ -217,6 +219,28 @@ TEST_F(TestBufferedOutputStream, Flush) {
ASSERT_OK(stream_->Close());
}
+TEST_F(TestBufferedOutputStream, SetBufferSize) {
+ OpenBuffered(20);
+
+ ASSERT_EQ(20, stream_->buffer_size());
+
+ const std::string datastr = "1234568790abcdefghij";
+ const char* data = datastr.data();
+
+ // Write part of the data, then shrink buffer size to make sure it gets
+ // flushed
+ ASSERT_OK(stream_->Write(data, 10));
+ ASSERT_OK(stream_->SetBufferSize(10));
+
+ ASSERT_EQ(10, stream_->buffer_size());
+
+ ASSERT_OK(stream_->Write(data + 10, 10));
+ ASSERT_OK(stream_->Flush());
+
+ AssertFileContents(path_, datastr);
+ ASSERT_OK(stream_->Close());
+}
+
TEST_F(TestBufferedOutputStream, Tell) {
OpenBuffered();
@@ -229,7 +253,7 @@ TEST_F(TestBufferedOutputStream, Tell) {
ASSERT_OK(stream_->Close());
- OpenBuffered(true /* append */);
+ OpenBuffered(kDefaultBufferSize, true /* append */);
AssertTell(100100);
WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
AssertTell(100190);
diff --git a/cpp/src/arrow/io/io-file-benchmark.cc b/cpp/src/arrow/io/io-file-benchmark.cc
index e5a326e..c57fa6d 100644
--- a/cpp/src/arrow/io/io-file-benchmark.cc
+++ b/cpp/src/arrow/io/io-file-benchmark.cc
@@ -41,6 +41,8 @@ std::string GetNullFile() { return "/dev/null"; }
const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
const std::valarray<int64_t> large_sizes = {8192, 100000};
+constexpr int64_t kBufferSize = 4096;
+
class BackgroundReader {
// A class that reads data in the background from a file descriptor
@@ -157,11 +159,12 @@ static void BM_FileOutputStreamSmallWritesToNull(
static void BM_BufferedOutputStreamSmallWritesToNull(
benchmark::State& state) { // NOLINT non-const reference
- std::shared_ptr<io::OutputStream> stream;
- ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
- stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+ std::shared_ptr<io::OutputStream> file;
+ ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &file));
- BenchmarkStreamingWrites(state, small_sizes, stream.get());
+ std::shared_ptr<io::BufferedOutputStream> buffered_file;
+ ABORT_NOT_OK(io::BufferedOutputStream::Create(file, kBufferSize, &buffered_file));
+ BenchmarkStreamingWrites(state, small_sizes, buffered_file.get());
}
// Benchmark writing a pipe
@@ -191,9 +194,10 @@ static void BM_BufferedOutputStreamSmallWritesToPipe(
std::shared_ptr<io::OutputStream> stream;
std::shared_ptr<BackgroundReader> reader;
SetupPipeWriter(&stream, &reader);
- stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
- BenchmarkStreamingWrites(state, small_sizes, stream.get(), reader.get());
+ std::shared_ptr<io::BufferedOutputStream> buffered_stream;
+ ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream));
+ BenchmarkStreamingWrites(state, small_sizes, buffered_stream.get(), reader.get());
}
static void BM_BufferedOutputStreamLargeWritesToPipe(
@@ -201,9 +205,11 @@ static void BM_BufferedOutputStreamLargeWritesToPipe(
std::shared_ptr<io::OutputStream> stream;
std::shared_ptr<BackgroundReader> reader;
SetupPipeWriter(&stream, &reader);
- stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
- BenchmarkStreamingWrites(state, large_sizes, stream.get(), reader.get());
+ std::shared_ptr<io::BufferedOutputStream> buffered_stream;
+ ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream));
+
+ BenchmarkStreamingWrites(state, large_sizes, buffered_stream.get(), reader.get());
}
// We use real time as we don't want to count CPU time spent in the