You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 14:11:33 UTC

arrow git commit: ARROW-508: [C++] Add basic threadsafety to normal files and memory maps

Repository: arrow
Updated Branches:
  refs/heads/master 1f81adcc8 -> 282103012


ARROW-508: [C++] Add basic threadsafety to normal files and memory maps

This patch is stacked on ARROW-494, so will need to be rebased.

* Since the naive `ReadAt` implementation involves a Seek and a Read, this locks until the read is completed.
* Normal file reads block until completion
* File writes block until completion

This covers the threadsafety requirements for parquet-cpp at least. For on-disk files, the following methods are now threadsafe:

* `ArrowInputFile::Read` and `ArrowInputFile::ReadAt`
* `ArrowOutputStream::Write`

parquet-cpp calls `Seek` in a couple places:

https://github.com/apache/parquet-cpp/blob/master/src/parquet/file/reader-internal.cc#L257

Strictly speaking, if two threads are trying to read the same file from the same input source, this could have a race condition in esoteric circumstances. I'm going to report a bug to change these to `ReadAt` which can be more easily made threadsafe

Author: Wes McKinney <we...@twosigma.com>

Closes #300 from wesm/ARROW-508 and squashes the following commits:

e57156c [Wes McKinney] Make base ReadableFileInterface::ReadAt and some file functions threadsafe


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/28210301
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/28210301
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/28210301

Branch: refs/heads/master
Commit: 2821030124eb3e884b0e48f09c38b54f00430b13
Parents: 1f81adc
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 09:11:26 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 09:11:26 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/io/file.cc         | 10 ++++-
 cpp/src/arrow/io/file.h          |  9 ++++-
 cpp/src/arrow/io/interfaces.cc   |  3 ++
 cpp/src/arrow/io/interfaces.h    | 12 +++++-
 cpp/src/arrow/io/io-file-test.cc | 69 +++++++++++++++++++++++++++++++++++
 5 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/28210301/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 3bf8dfa..ff58e53 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -76,6 +76,7 @@
 #include <cstring>
 #include <iostream>
 #include <limits>
+#include <mutex>
 #include <sstream>
 #include <vector>
 
@@ -350,6 +351,7 @@ class OSFile {
   }
 
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+    std::lock_guard<std::mutex> guard(lock_);
     return FileRead(fd_, out, nbytes, bytes_read);
   }
 
@@ -361,6 +363,7 @@ class OSFile {
   Status Tell(int64_t* pos) const { return FileTell(fd_, pos); }
 
   Status Write(const uint8_t* data, int64_t length) {
+    std::lock_guard<std::mutex> guard(lock_);
     if (length < 0) { return Status::IOError("Length must be non-negative"); }
     return FileWrite(fd_, data, length);
   }
@@ -377,6 +380,8 @@ class OSFile {
  protected:
   std::string path_;
 
+  std::mutex lock_;
+
   // File descriptor
   int fd_;
 
@@ -649,6 +654,8 @@ bool MemoryMappedFile::supports_zero_copy() const {
 }
 
 Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+  std::lock_guard<std::mutex> guard(lock_);
+
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
   }
@@ -658,13 +665,14 @@ Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t
 }
 
 Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
+  std::lock_guard<std::mutex> guard(lock_);
+
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
   }
   if (nbytes + memory_map_->position() > memory_map_->size()) {
     return Status::Invalid("Cannot write past end of memory map");
   }
-
   return WriteInternal(data, nbytes);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/28210301/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 930346b..fe55e96 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -50,6 +50,8 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
   // OutputStream interface
   Status Close() override;
   Status Tell(int64_t* position) override;
+
+  // Write bytes to the stream. Thread-safe
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
   int file_descriptor() const;
@@ -76,6 +78,7 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
   Status Close() override;
   Status Tell(int64_t* position) override;
 
+  // Read bytes from the file. Thread-safe
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
@@ -112,16 +115,18 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
 
   Status Seek(int64_t position) override;
 
-  // Required by ReadableFileInterface, copies memory into out
+  // Required by ReadableFileInterface, copies memory into out. Not thread-safe
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
 
-  // Zero copy read
+  // Zero copy read. Not thread-safe
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
   bool supports_zero_copy() const override;
 
+  /// Write data at the current position in the file. Thread-safe
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
+  /// Write data at a particular position in the file. Thread-safe
   Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
 
   // @return: the size in bytes of the memory source

http://git-wip-us.apache.org/repos/asf/arrow/blob/28210301/cpp/src/arrow/io/interfaces.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 8040f93..7e78caa 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <mutex>
 
 #include "arrow/buffer.h"
 #include "arrow/status.h"
@@ -34,12 +35,14 @@ ReadableFileInterface::ReadableFileInterface() {
 
 Status ReadableFileInterface::ReadAt(
     int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  std::lock_guard<std::mutex> guard(lock_);
   RETURN_NOT_OK(Seek(position));
   return Read(nbytes, bytes_read, out);
 }
 
 Status ReadableFileInterface::ReadAt(
     int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  std::lock_guard<std::mutex> guard(lock_);
   RETURN_NOT_OK(Seek(position));
   return Read(nbytes, out);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/28210301/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index fdb3788..7868090 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 
 #include "arrow/util/macros.h"
@@ -99,14 +100,21 @@ class ARROW_EXPORT ReadableFileInterface : public InputStream, public Seekable {
 
   virtual bool supports_zero_copy() const = 0;
 
-  // Read at position, provide default implementations using Read(...), but can
-  // be overridden
+  /// Read at position, provide default implementations using Read(...), but can
+  /// be overridden
+  ///
+  /// Default implementation is thread-safe
   virtual Status ReadAt(
       int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out);
 
+  /// Default implementation is thread-safe
   virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out);
 
+  std::mutex& lock() { return lock_; }
+
  protected:
+  std::mutex lock_;
+
   ReadableFileInterface();
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/28210301/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 999b296..86a3287 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <cstdint>
 #include <cstdio>
 #include <cstring>
@@ -25,6 +26,7 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <thread>
 
 #include "gtest/gtest.h"
 
@@ -325,6 +327,40 @@ TEST_F(TestReadableFile, CustomMemoryPool) {
   ASSERT_EQ(2, pool.num_allocations());
 }
 
+TEST_F(TestReadableFile, ThreadSafety) {
+  std::string data = "foobar";
+  {
+    std::ofstream stream;
+    stream.open(path_.c_str());
+    stream << data;
+  }
+
+  MyMemoryPool pool;
+  ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
+
+  std::atomic<int> correct_count(0);
+  const int niter = 10000;
+
+  auto ReadData = [&correct_count, &data, niter, this] () {
+    std::shared_ptr<Buffer> buffer;
+
+    for (int i = 0; i < niter; ++i) {
+      ASSERT_OK(file_->ReadAt(0, 3, &buffer));
+      if (0 == memcmp(data.c_str(), buffer->data(), 3)) {
+        correct_count += 1;
+      }
+    }
+  };
+
+  std::thread thread1(ReadData);
+  std::thread thread2(ReadData);
+
+  thread1.join();
+  thread2.join();
+
+  ASSERT_EQ(niter * 2, correct_count);
+}
+
 // ----------------------------------------------------------------------
 // Memory map tests
 
@@ -455,5 +491,38 @@ TEST_F(TestMemoryMappedFile, CastableToFileInterface) {
   std::shared_ptr<FileInterface> file = memory_mapped_file;
 }
 
+TEST_F(TestMemoryMappedFile, ThreadSafety) {
+  std::string data = "foobar";
+  std::string path = "ipc-multithreading-test";
+  CreateFile(path, static_cast<int>(data.size()));
+
+  std::shared_ptr<MemoryMappedFile> file;
+  ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &file));
+  ASSERT_OK(file->Write(reinterpret_cast<const uint8_t*>(data.c_str()),
+          static_cast<int64_t>(data.size())));
+
+  std::atomic<int> correct_count(0);
+  const int niter = 10000;
+
+  auto ReadData = [&correct_count, &data, niter, &file] () {
+    std::shared_ptr<Buffer> buffer;
+
+    for (int i = 0; i < niter; ++i) {
+      ASSERT_OK(file->ReadAt(0, 3, &buffer));
+      if (0 == memcmp(data.c_str(), buffer->data(), 3)) {
+        correct_count += 1;
+      }
+    }
+  };
+
+  std::thread thread1(ReadData);
+  std::thread thread2(ReadData);
+
+  thread1.join();
+  thread2.join();
+
+  ASSERT_EQ(niter * 2, correct_count);
+}
+
 }  // namespace io
 }  // namespace arrow