You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/07/05 12:54:56 UTC

[arrow] branch master updated: ARROW-4187: [C++] Enable file-benchmark on Windows

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cbc42e  ARROW-4187: [C++] Enable file-benchmark on Windows
9cbc42e is described below

commit 9cbc42e48de26d022ab94aa88576179d70ae4521
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Fri Jul 5 08:54:44 2019 -0400

    ARROW-4187: [C++] Enable file-benchmark on Windows
    
    Also fix take-benchmark compilation.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #4809 from pitrou/ARROW-4187-file-benchmark-windows and squashes the following commits:
    
    46defe6af <Antoine Pitrou> ARROW-4187:  Enable file-benchmark on Windows
---
 cpp/src/arrow/compute/kernels/take-benchmark.cc |  9 ++-
 cpp/src/arrow/io/file-benchmark.cc              | 87 +++++++++++++++++++++++--
 2 files changed, 86 insertions(+), 10 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/take-benchmark.cc b/cpp/src/arrow/compute/kernels/take-benchmark.cc
index 139e183..d28f7af 100644
--- a/cpp/src/arrow/compute/kernels/take-benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/take-benchmark.cc
@@ -48,7 +48,8 @@ static void TakeInt64(benchmark::State& state) {
 
   auto values = rand.Int64(array_size, -100, 100, args.null_proportion);
 
-  auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+  auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+                            static_cast<int32_t>(array_size - 1), args.null_proportion);
 
   TakeBenchmark(state, values, indices);
 }
@@ -64,7 +65,8 @@ static void TakeFixedSizeList1Int64(benchmark::State& state) {
       fixed_size_list(int64(), 1), array_size, int_array, int_array->null_bitmap(),
       int_array->null_count());
 
-  auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+  auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+                            static_cast<int32_t>(array_size - 1), args.null_proportion);
 
   TakeBenchmark(state, values, indices);
 }
@@ -110,7 +112,8 @@ static void TakeString(benchmark::State& state) {
   auto values = std::static_pointer_cast<StringArray>(rand.String(
       array_size, string_min_length, string_max_length, args.null_proportion));
 
-  auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+  auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+                            static_cast<int32_t>(array_size - 1), args.null_proportion);
 
   TakeBenchmark(state, values, indices);
 }
diff --git a/cpp/src/arrow/io/file-benchmark.cc b/cpp/src/arrow/io/file-benchmark.cc
index 74b92cb..b0880fd 100644
--- a/cpp/src/arrow/io/file-benchmark.cc
+++ b/cpp/src/arrow/io/file-benchmark.cc
@@ -20,6 +20,8 @@
 #include "arrow/io/file.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/io-util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/windows_compatibility.h"
 
 #include "benchmark/benchmark.h"
 
@@ -30,7 +32,11 @@
 #include <thread>
 #include <valarray>
 
-#ifndef _WIN32
+#ifdef _WIN32
+
+#include <io.h>
+
+#else
 
 #include <fcntl.h>
 #include <poll.h>
@@ -40,17 +46,82 @@
 
 namespace arrow {
 
-#ifndef _WIN32
-
-std::string GetNullFile() { return "/dev/null"; }
+std::string GetNullFile() {
+#ifdef _WIN32
+  return "NUL";
+#else
+  return "/dev/null";
+#endif
+}
 
 const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
 const std::valarray<int64_t> large_sizes = {8192, 100000};
 
 constexpr int64_t kBufferSize = 4096;
 
+#ifdef _WIN32
+
+class BackgroundReader {
+  // A class that reads data in the background from a file descriptor
+  // (Windows implementation)
+
+ public:
+  static std::shared_ptr<BackgroundReader> StartReader(int fd) {
+    std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
+    reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
+    return reader;
+  }
+  void Stop() { ARROW_CHECK(SetEvent(event_)); }
+  void Join() { worker_->join(); }
+
+  ~BackgroundReader() {
+    ABORT_NOT_OK(internal::FileClose(fd_));
+    ARROW_CHECK(CloseHandle(event_));
+  }
+
+ protected:
+  explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
+    file_handle_ = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+    ARROW_CHECK_NE(file_handle_, INVALID_HANDLE_VALUE);
+    event_ =
+        CreateEvent(nullptr, /* bManualReset=*/TRUE, /* bInitialState=*/FALSE, nullptr);
+    ARROW_CHECK_NE(event_, INVALID_HANDLE_VALUE);
+  }
+
+  void LoopReading() {
+    const HANDLE handles[] = {file_handle_, event_};
+    while (true) {
+      DWORD ret = WaitForMultipleObjects(2, handles, /* bWaitAll=*/FALSE, INFINITE);
+      ARROW_CHECK_NE(ret, WAIT_FAILED);
+      if (ret == WAIT_OBJECT_0 + 1) {
+        // Got stop request
+        break;
+      } else if (ret == WAIT_OBJECT_0) {
+        // File ready for reading
+        int64_t bytes_read;
+        ARROW_CHECK_OK(internal::FileRead(fd_, buffer_, buffer_size_, &bytes_read));
+        total_bytes_ += bytes_read;
+      } else {
+        ARROW_LOG(FATAL) << "Unexpected WaitForMultipleObjects return value " << ret;
+      }
+    }
+  }
+
+  int fd_;
+  HANDLE file_handle_, event_;
+  int64_t total_bytes_;
+
+  static const int64_t buffer_size_ = 16384;
+  uint8_t buffer_[buffer_size_];
+
+  std::unique_ptr<std::thread> worker_;
+};
+
+#else
+
 class BackgroundReader {
   // A class that reads data in the background from a file descriptor
+  // (Unix implementation)
 
  public:
   static std::shared_ptr<BackgroundReader> StartReader(int fd) {
@@ -116,6 +187,8 @@ class BackgroundReader {
   std::unique_ptr<std::thread> worker_;
 };
 
+#endif
+
 // Set up a pipe with an OutputStream at one end and a BackgroundReader at
 // the other end.
 static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
@@ -139,6 +212,9 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
       ABORT_NOT_OK(stream->Write(data, size));
     }
   }
+  // For Windows: need to close writer before joining reader thread.
+  ABORT_NOT_OK(stream->Close());
+
   const int64_t total_bytes = static_cast<int64_t>(state.iterations()) * sum_sizes;
   state.SetBytesProcessed(total_bytes);
 
@@ -147,7 +223,6 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
     reader->Stop();
     reader->Join();
   }
-  ABORT_NOT_OK(stream->Close());
 }
 
 // Benchmark writing to /dev/null
@@ -232,6 +307,4 @@ BENCHMARK(BufferedOutputStreamSmallWritesToNull)->UseRealTime();
 BENCHMARK(BufferedOutputStreamSmallWritesToPipe)->UseRealTime();
 BENCHMARK(BufferedOutputStreamLargeWritesToPipe)->UseRealTime();
 
-#endif  // ifndef _WIN32
-
 }  // namespace arrow