You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/04/23 12:49:07 UTC

[arrow] branch master updated: ARROW-2427: [C++] Implement ReadAt properly

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 03251e9  ARROW-2427: [C++] Implement ReadAt properly
03251e9 is described below

commit 03251e97154c32e56ec86d362ba1599a4e3f4822
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Apr 23 14:48:47 2018 +0200

    ARROW-2427: [C++] Implement ReadAt properly
    
    Allow for concurrent I/O by avoiding locking and seeking.
    
    **Caveat**: this changes `ReadAt` to not advance the file pointer (under POSIX). Since `ReadAt` is meant for multithreaded I/O, relying on the file pointer doesn't make sense. Unfortunately, some IO in Arrow mixes calls to `ReadAt()` and file pointer-dependent operations (see diff).
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #1867 from pitrou/ARROW-2427-readat and squashes the following commits:
    
    370ae78 <Antoine Pitrou> Reformat
    9de9b4b <Antoine Pitrou> ARROW-2427:  Implement ReadAt properly
---
 cpp/src/arrow/io/file.cc                | 58 ++++++++++++++++++---------------
 cpp/src/arrow/io/interfaces.h           |  6 ++--
 cpp/src/arrow/io/io-file-test.cc        | 32 ++++++------------
 cpp/src/arrow/io/memory.cc              | 33 ++++++++++---------
 cpp/src/arrow/io/memory.h               |  4 +--
 cpp/src/arrow/ipc/feather.cc            |  2 +-
 cpp/src/arrow/ipc/message.cc            | 20 +++++++++++-
 cpp/src/arrow/ipc/message.h             | 12 +++++++
 cpp/src/arrow/python/arrow_to_python.cc |  4 +--
 9 files changed, 98 insertions(+), 73 deletions(-)

diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ba012be..e3d6f84 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -125,9 +125,8 @@ class OSFile {
   }
 
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
-    std::lock_guard<std::mutex> guard(lock_);
-    RETURN_NOT_OK(Seek(position));
-    return Read(nbytes, bytes_read, out);
+    return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes,
+                                bytes_read);
   }
 
   Status Seek(int64_t pos) {
@@ -203,6 +202,19 @@ class ReadableFile::ReadableFileImpl : public OSFile {
     return Status::OK();
   }
 
+  Status ReadBufferAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    std::shared_ptr<ResizableBuffer> buffer;
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
+
+    int64_t bytes_read = 0;
+    RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
+    if (bytes_read < nbytes) {
+      RETURN_NOT_OK(buffer->Resize(bytes_read));
+    }
+    *out = buffer;
+    return Status::OK();
+  }
+
  private:
   MemoryPool* pool_;
 };
@@ -247,9 +259,7 @@ Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_rea
 
 Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
                             std::shared_ptr<Buffer>* out) {
-  std::lock_guard<std::mutex> guard(impl_->lock());
-  RETURN_NOT_OK(Seek(position));
-  return impl_->ReadBuffer(nbytes, out);
+  return impl_->ReadBufferAt(position, nbytes, out);
 }
 
 Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
@@ -459,42 +469,38 @@ Status MemoryMappedFile::Close() {
   return Status::OK();
 }
 
-Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
-  nbytes = std::max<int64_t>(
-      0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                                void* out) {
+  nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
   if (nbytes > 0) {
-    std::memcpy(out, memory_map_->head(), static_cast<size_t>(nbytes));
+    std::memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
   }
   *bytes_read = nbytes;
-  memory_map_->advance(nbytes);
   return Status::OK();
 }
 
-Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  nbytes = std::max<int64_t>(
-      0, std::min(nbytes, memory_map_->size() - memory_map_->position()));
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
+                                std::shared_ptr<Buffer>* out) {
+  nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
 
   if (nbytes > 0) {
-    *out = SliceBuffer(memory_map_, memory_map_->position(), nbytes);
+    *out = SliceBuffer(memory_map_, position, nbytes);
   } else {
     *out = std::make_shared<Buffer>(nullptr, 0);
   }
-  memory_map_->advance(nbytes);
   return Status::OK();
 }
 
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
-                                void* out) {
-  std::lock_guard<std::mutex> guard(memory_map_->lock());
-  RETURN_NOT_OK(Seek(position));
-  return Read(nbytes, bytes_read, out);
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+  RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
+  memory_map_->advance(*bytes_read);
+  return Status::OK();
 }
 
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
-                                std::shared_ptr<Buffer>* out) {
-  std::lock_guard<std::mutex> guard(memory_map_->lock());
-  RETURN_NOT_OK(Seek(position));
-  return Read(nbytes, out);
+Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out));
+  memory_map_->advance((*out)->size());
+  return Status::OK();
 }
 
 bool MemoryMappedFile::supports_zero_copy() const { return true; }
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 09536a4..743621c 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -128,7 +128,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
   virtual bool supports_zero_copy() const = 0;
 
   /// \brief Read nbytes at position, provide default implementations using Read(...), but
-  /// can be overridden. Default implementation is thread-safe.
+  /// can be overridden. Default implementation is thread-safe.  It is unspecified
+  /// whether this method updates the file position or not.
   ///
   /// \note Child classes must explicitly call this implementation or provide their own.
   ///
@@ -141,7 +142,8 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
                         void* out) = 0;
 
   /// \brief Read nbytes at position, provide default implementations using Read(...), but
-  /// can be overridden. Default implementation is thread-safe.
+  /// can be overridden. Default implementation is thread-safe.  It is unspecified
+  /// whether this method updates the file position or not.
   ///
   /// \note Child classes must explicitly call this implementation or provide their own.
   ///
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index d3ef908..b661fb6 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -359,36 +359,23 @@ TEST_F(TestReadableFile, ReadAt) {
   OpenFile();
 
   int64_t bytes_read;
-  int64_t position;
 
   ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer));
   ASSERT_EQ(4, bytes_read);
   ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
 
-  // position advanced
-  ASSERT_OK(file_->Tell(&position));
-  ASSERT_EQ(4, position);
-
-  ASSERT_OK(file_->ReadAt(4, 10, &bytes_read, buffer));
-  ASSERT_EQ(4, bytes_read);
-  ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
-
-  // position advanced to EOF
-  ASSERT_OK(file_->Tell(&position));
-  ASSERT_EQ(8, position);
+  ASSERT_OK(file_->ReadAt(1, 10, &bytes_read, buffer));
+  ASSERT_EQ(7, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, "estdata", 7));
 
   // Check buffer API
   std::shared_ptr<Buffer> buffer2;
 
-  ASSERT_OK(file_->ReadAt(0, 4, &buffer2));
-  ASSERT_EQ(4, buffer2->size());
+  ASSERT_OK(file_->ReadAt(2, 5, &buffer2));
+  ASSERT_EQ(5, buffer2->size());
 
-  Buffer expected(reinterpret_cast<const uint8_t*>(test_data), 4);
+  Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5);
   ASSERT_TRUE(buffer2->Equals(expected));
-
-  // position advanced
-  ASSERT_OK(file_->Tell(&position));
-  ASSERT_EQ(4, position);
 }
 
 TEST_F(TestReadableFile, NonExistentFile) {
@@ -457,14 +444,15 @@ TEST_F(TestReadableFile, ThreadSafety) {
   ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
 
   std::atomic<int> correct_count(0);
-  int niter = 10000;
+  int niter = 30000;
 
   auto ReadData = [&correct_count, &data, &niter, this]() {
     std::shared_ptr<Buffer> buffer;
 
     for (int i = 0; i < niter; ++i) {
-      ASSERT_OK(file_->ReadAt(0, 3, &buffer));
-      if (0 == memcmp(data.c_str(), buffer->data(), 3)) {
+      const int offset = i % 3;
+      ASSERT_OK(file_->ReadAt(offset, 3, &buffer));
+      if (0 == memcmp(data.c_str() + offset, buffer->data(), 3)) {
         correct_count += 1;
       }
     }
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 512e7f5..54cf8e4 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -256,42 +256,43 @@ Status BufferReader::Tell(int64_t* position) const {
 
 bool BufferReader::supports_zero_copy() const { return true; }
 
-Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                            void* buffer) {
   if (nbytes < 0) {
     return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
   }
-  *bytes_read = std::min(nbytes, size_ - position_);
+  *bytes_read = std::min(nbytes, size_ - position);
   if (*bytes_read) {
-    memcpy(buffer, data_ + position_, *bytes_read);
-    position_ += *bytes_read;
+    memcpy(buffer, data_ + position, *bytes_read);
   }
   return Status::OK();
 }
 
-Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
+                            std::shared_ptr<Buffer>* out) {
   if (nbytes < 0) {
     return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
   }
-  int64_t size = std::min(nbytes, size_ - position_);
+  int64_t size = std::min(nbytes, size_ - position);
 
   if (size > 0 && buffer_ != nullptr) {
-    *out = SliceBuffer(buffer_, position_, size);
+    *out = SliceBuffer(buffer_, position, size);
   } else {
-    *out = std::make_shared<Buffer>(data_ + position_, size);
+    *out = std::make_shared<Buffer>(data_ + position, size);
   }
-
-  position_ += size;
   return Status::OK();
 }
 
-Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
-                            void* out) {
-  return RandomAccessFile::ReadAt(position, nbytes, bytes_read, out);
+Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
+  RETURN_NOT_OK(ReadAt(position_, nbytes, bytes_read, buffer));
+  position_ += *bytes_read;
+  return Status::OK();
 }
 
-Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
-                            std::shared_ptr<Buffer>* out) {
-  return RandomAccessFile::ReadAt(position, nbytes, out);
+Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(ReadAt(position_, nbytes, out));
+  position_ += (*out)->size();
+  return Status::OK();
 }
 
 Status BufferReader::GetSize(int64_t* size) {
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index cf370b3..7757c18 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -113,13 +113,11 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile {
   Status Close() override;
   Status Tell(int64_t* position) const override;
   Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override;
-
   // Zero copy read
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
                 void* out) override;
-
-  /// Default implementation is thread-safe
   Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
   Status GetSize(int64_t* size) override;
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index df5c799..7e762ac 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -283,7 +283,7 @@ class TableReader::TableReaderImpl {
     }
 
     std::shared_ptr<Buffer> buffer;
-    RETURN_NOT_OK(source->Read(magic_size, &buffer));
+    RETURN_NOT_OK(source->ReadAt(0, magic_size, &buffer));
 
     if (memcmp(buffer->data(), kFeatherMagicBytes, magic_size)) {
       return Status::Invalid("Not a feather file");
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 0a5bcdc..4ee0c34 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -157,6 +157,24 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
   return Message::Open(metadata, body, out);
 }
 
+Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
+                         io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
+  auto fb_message = flatbuf::GetMessage(metadata->data());
+
+  int64_t body_length = fb_message->bodyLength();
+
+  std::shared_ptr<Buffer> body;
+  RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
+  if (body->size() < body_length) {
+    std::stringstream ss;
+    ss << "Expected to be able to read " << body_length << " bytes for message body, got "
+       << body->size();
+    return Status::IOError(ss.str());
+  }
+
+  return Message::Open(metadata, body, out);
+}
+
 Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
   int32_t metadata_length = 0;
   RETURN_NOT_OK(internal::WriteMessage(*metadata(), file, &metadata_length));
@@ -210,7 +228,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
   }
 
   auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
-  return Message::ReadFrom(metadata, file, message);
+  return Message::ReadFrom(offset + metadata_length, metadata, file, message);
 }
 
 Status ReadMessage(io::InputStream* file, bool aligned,
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index e5ea409..d150eab 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -97,6 +97,18 @@ class ARROW_EXPORT Message {
   static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
                          std::unique_ptr<Message>* out);
 
+  /// \brief Read message body from position in file, and create Message given
+  /// the Flatbuffer metadata
+  /// \param[in] offset the position in the file where the message body starts.
+  /// \param[in] metadata containing a serialized Message flatbuffer
+  /// \param[in] file the seekable file interface to read from
+  /// \param[out] out the created Message
+  /// \return Status
+  ///
+  /// \note If file supports zero-copy, this is zero-copy
+  static Status ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
+                         io::RandomAccessFile* file, std::unique_ptr<Message>* out);
+
   /// \brief Return true if message type and contents are equal
   ///
   /// \param other another message
diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
index 3fdc5f1..6ae5d43 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -268,11 +268,11 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
     int64_t size;
     RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &bytes_read,
                               reinterpret_cast<uint8_t*>(&size)));
-    RETURN_NOT_OK(src->Tell(&offset));
+    offset += sizeof(int64_t);
     std::shared_ptr<Buffer> buffer;
     RETURN_NOT_OK(src->ReadAt(offset, size, &buffer));
     out->buffers.push_back(buffer);
-    RETURN_NOT_OK(src->Tell(&offset));
+    offset += size;
   }
 
   return Status::OK();

-- 
To stop receiving notification emails like this one, please contact
apitrou@apache.org.