You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/04/23 12:49:07 UTC
[arrow] branch master updated: ARROW-2427: [C++] Implement ReadAt
properly
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 03251e9 ARROW-2427: [C++] Implement ReadAt properly
03251e9 is described below
commit 03251e97154c32e56ec86d362ba1599a4e3f4822
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Apr 23 14:48:47 2018 +0200
ARROW-2427: [C++] Implement ReadAt properly
Allow for concurrent I/O by avoiding locking and seeking.
**Caveat**: this changes `ReadAt` to not advance the file pointer (under POSIX). Since `ReadAt` is meant for multithreaded I/O, relying on the file pointer doesn't make sense. Unfortunately, some IO in Arrow mixes calls to `ReadAt()` and file pointer-dependent operations (see diff).
Author: Antoine Pitrou <an...@python.org>
Closes #1867 from pitrou/ARROW-2427-readat and squashes the following commits:
370ae78 <Antoine Pitrou> Reformat
9de9b4b <Antoine Pitrou> ARROW-2427: Implement ReadAt properly
---
cpp/src/arrow/io/file.cc | 58 ++++++++++++++++++---------------
cpp/src/arrow/io/interfaces.h | 6 ++--
cpp/src/arrow/io/io-file-test.cc | 32 ++++++------------
cpp/src/arrow/io/memory.cc | 33 ++++++++++---------
cpp/src/arrow/io/memory.h | 4 +--
cpp/src/arrow/ipc/feather.cc | 2 +-
cpp/src/arrow/ipc/message.cc | 20 +++++++++++-
cpp/src/arrow/ipc/message.h | 12 +++++++
cpp/src/arrow/python/arrow_to_python.cc | 4 +--
9 files changed, 98 insertions(+), 73 deletions(-)
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ba012be..e3d6f84 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -125,9 +125,8 @@ class OSFile {
}
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
- std::lock_guard<std::mutex> guard(lock_);
- RETURN_NOT_OK(Seek(position));
- return Read(nbytes, bytes_read, out);
+ return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes,
+ bytes_read);
}
Status Seek(int64_t pos) {
@@ -203,6 +202,19 @@ class ReadableFile::ReadableFileImpl : public OSFile {
return Status::OK();
}
+ Status ReadBufferAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ std::shared_ptr<ResizableBuffer> buffer;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
+
+ int64_t bytes_read = 0;
+ RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
+ if (bytes_read < nbytes) {
+ RETURN_NOT_OK(buffer->Resize(bytes_read));
+ }
+ *out = buffer;
+ return Status::OK();
+ }
+
private:
MemoryPool* pool_;
};
@@ -247,9 +259,7 @@ Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_rea
Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
- std::lock_guard<std::mutex> guard(impl_->lock());
- RETURN_NOT_OK(Seek(position));
- return impl_->ReadBuffer(nbytes, out);
+ return impl_->ReadBufferAt(position, nbytes, out);
}
Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
@@ -459,42 +469,38 @@ Status MemoryMappedFile::Close() {
return Status::OK();
}
-Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
- nbytes = std::max<int64_t>(
- 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+ void* out) {
+ nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
if (nbytes > 0) {
- std::memcpy(out, memory_map_->head(), static_cast<size_t>(nbytes));
+ std::memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
}
*bytes_read = nbytes;
- memory_map_->advance(nbytes);
return Status::OK();
}
-Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- nbytes = std::max<int64_t>(
- 0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
+ std::shared_ptr<Buffer>* out) {
+ nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
if (nbytes > 0) {
- *out = SliceBuffer(memory_map_, memory_map_->position(), nbytes);
+ *out = SliceBuffer(memory_map_, position, nbytes);
} else {
*out = std::make_shared<Buffer>(nullptr, 0);
}
- memory_map_->advance(nbytes);
return Status::OK();
}
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
- void* out) {
- std::lock_guard<std::mutex> guard(memory_map_->lock());
- RETURN_NOT_OK(Seek(position));
- return Read(nbytes, bytes_read, out);
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+ RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
+ memory_map_->advance(*bytes_read);
+ return Status::OK();
}
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
- std::shared_ptr<Buffer>* out) {
- std::lock_guard<std::mutex> guard(memory_map_->lock());
- RETURN_NOT_OK(Seek(position));
- return Read(nbytes, out);
+Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out));
+ memory_map_->advance((*out)->size());
+ return Status::OK();
}
bool MemoryMappedFile::supports_zero_copy() const { return true; }
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 09536a4..743621c 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -128,7 +128,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
virtual bool supports_zero_copy() const = 0;
/// \brief Read nbytes at position, provide default implementations using Read(...), but
- /// can be overridden. Default implementation is thread-safe.
+ /// can be overridden. Default implementation is thread-safe. It is unspecified
+ /// whether this method updates the file position or not.
///
/// \note Child classes must explicitly call this implementation or provide their own.
///
@@ -141,7 +142,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
void* out) = 0;
/// \brief Read nbytes at position, provide default implementations using Read(...), but
- /// can be overridden. Default implementation is thread-safe.
+ /// can be overridden. Default implementation is thread-safe. It is unspecified
+ /// whether this method updates the file position or not.
///
/// \note Child classes must explicitly call this implementation or provide their own.
///
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index d3ef908..b661fb6 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -359,36 +359,23 @@ TEST_F(TestReadableFile, ReadAt) {
OpenFile();
int64_t bytes_read;
- int64_t position;
ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer));
ASSERT_EQ(4, bytes_read);
ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
- // position advanced
- ASSERT_OK(file_->Tell(&position));
- ASSERT_EQ(4, position);
-
- ASSERT_OK(file_->ReadAt(4, 10, &bytes_read, buffer));
- ASSERT_EQ(4, bytes_read);
- ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
-
- // position advanced to EOF
- ASSERT_OK(file_->Tell(&position));
- ASSERT_EQ(8, position);
+ ASSERT_OK(file_->ReadAt(1, 10, &bytes_read, buffer));
+ ASSERT_EQ(7, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buffer, "estdata", 7));
// Check buffer API
std::shared_ptr<Buffer> buffer2;
- ASSERT_OK(file_->ReadAt(0, 4, &buffer2));
- ASSERT_EQ(4, buffer2->size());
+ ASSERT_OK(file_->ReadAt(2, 5, &buffer2));
+ ASSERT_EQ(5, buffer2->size());
- Buffer expected(reinterpret_cast<const uint8_t*>(test_data), 4);
+ Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5);
ASSERT_TRUE(buffer2->Equals(expected));
-
- // position advanced
- ASSERT_OK(file_->Tell(&position));
- ASSERT_EQ(4, position);
}
TEST_F(TestReadableFile, NonExistentFile) {
@@ -457,14 +444,15 @@ TEST_F(TestReadableFile, ThreadSafety) {
ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
std::atomic<int> correct_count(0);
- int niter = 10000;
+ int niter = 30000;
auto ReadData = [&correct_count, &data, &niter, this]() {
std::shared_ptr<Buffer> buffer;
for (int i = 0; i < niter; ++i) {
- ASSERT_OK(file_->ReadAt(0, 3, &buffer));
- if (0 == memcmp(data.c_str(), buffer->data(), 3)) {
+ const int offset = i % 3;
+ ASSERT_OK(file_->ReadAt(offset, 3, &buffer));
+ if (0 == memcmp(data.c_str() + offset, buffer->data(), 3)) {
correct_count += 1;
}
}
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 512e7f5..54cf8e4 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -256,42 +256,43 @@ Status BufferReader::Tell(int64_t* position) const {
bool BufferReader::supports_zero_copy() const { return true; }
-Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+ void* buffer) {
if (nbytes < 0) {
return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
}
- *bytes_read = std::min(nbytes, size_ - position_);
+ *bytes_read = std::min(nbytes, size_ - position);
if (*bytes_read) {
- memcpy(buffer, data_ + position_, *bytes_read);
- position_ += *bytes_read;
+ memcpy(buffer, data_ + position, *bytes_read);
}
return Status::OK();
}
-Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
+ std::shared_ptr<Buffer>* out) {
if (nbytes < 0) {
return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
}
- int64_t size = std::min(nbytes, size_ - position_);
+ int64_t size = std::min(nbytes, size_ - position);
if (size > 0 && buffer_ != nullptr) {
- *out = SliceBuffer(buffer_, position_, size);
+ *out = SliceBuffer(buffer_, position, size);
} else {
- *out = std::make_shared<Buffer>(data_ + position_, size);
+ *out = std::make_shared<Buffer>(data_ + position, size);
}
-
- position_ += size;
return Status::OK();
}
-Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
- void* out) {
- return RandomAccessFile::ReadAt(position, nbytes, bytes_read, out);
+Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
+ RETURN_NOT_OK(ReadAt(position_, nbytes, bytes_read, buffer));
+ position_ += *bytes_read;
+ return Status::OK();
}
-Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
- std::shared_ptr<Buffer>* out) {
- return RandomAccessFile::ReadAt(position, nbytes, out);
+Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ RETURN_NOT_OK(ReadAt(position_, nbytes, out));
+ position_ += (*out)->size();
+ return Status::OK();
}
Status BufferReader::GetSize(int64_t* size) {
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index cf370b3..7757c18 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -113,13 +113,11 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile {
Status Close() override;
Status Tell(int64_t* position) const override;
Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override;
-
// Zero copy read
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
-
- /// Default implementation is thread-safe
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
Status GetSize(int64_t* size) override;
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index df5c799..7e762ac 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -283,7 +283,7 @@ class TableReader::TableReaderImpl {
}
std::shared_ptr<Buffer> buffer;
- RETURN_NOT_OK(source->Read(magic_size, &buffer));
+ RETURN_NOT_OK(source->ReadAt(0, magic_size, &buffer));
if (memcmp(buffer->data(), kFeatherMagicBytes, magic_size)) {
return Status::Invalid("Not a feather file");
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 0a5bcdc..4ee0c34 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -157,6 +157,24 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
return Message::Open(metadata, body, out);
}
+Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
+ io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
+ auto fb_message = flatbuf::GetMessage(metadata->data());
+
+ int64_t body_length = fb_message->bodyLength();
+
+ std::shared_ptr<Buffer> body;
+ RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
+ if (body->size() < body_length) {
+ std::stringstream ss;
+ ss << "Expected to be able to read " << body_length << " bytes for message body, got "
+ << body->size();
+ return Status::IOError(ss.str());
+ }
+
+ return Message::Open(metadata, body, out);
+}
+
Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
int32_t metadata_length = 0;
RETURN_NOT_OK(internal::WriteMessage(*metadata(), file, &metadata_length));
@@ -210,7 +228,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
}
auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
- return Message::ReadFrom(metadata, file, message);
+ return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}
Status ReadMessage(io::InputStream* file, bool aligned,
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index e5ea409..d150eab 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -97,6 +97,18 @@ class ARROW_EXPORT Message {
static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
std::unique_ptr<Message>* out);
+ /// \brief Read message body from position in file, and create Message given
+ /// the Flatbuffer metadata
+ /// \param[in] offset the position in the file where the message body starts.
+ /// \param[in] metadata containing a serialized Message flatbuffer
+ /// \param[in] file the seekable file interface to read from
+ /// \param[out] out the created Message
+ /// \return Status
+ ///
+ /// \note If file supports zero-copy, this is zero-copy
+ static Status ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
+ io::RandomAccessFile* file, std::unique_ptr<Message>* out);
+
/// \brief Return true if message type and contents are equal
///
/// \param other another message
diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
index 3fdc5f1..6ae5d43 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -268,11 +268,11 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
int64_t size;
RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &bytes_read,
reinterpret_cast<uint8_t*>(&size)));
- RETURN_NOT_OK(src->Tell(&offset));
+ offset += sizeof(int64_t);
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(src->ReadAt(offset, size, &buffer));
out->buffers.push_back(buffer);
- RETURN_NOT_OK(src->Tell(&offset));
+ offset += size;
}
return Status::OK();
--
To stop receiving notification emails like this one, please contact
apitrou@apache.org.