You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/07/05 12:54:56 UTC
[arrow] branch master updated: ARROW-4187: [C++] Enable
file-benchmark on Windows
This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9cbc42e ARROW-4187: [C++] Enable file-benchmark on Windows
9cbc42e is described below
commit 9cbc42e48de26d022ab94aa88576179d70ae4521
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Fri Jul 5 08:54:44 2019 -0400
ARROW-4187: [C++] Enable file-benchmark on Windows
Also fix take-benchmark compilation.
Author: Antoine Pitrou <an...@python.org>
Closes #4809 from pitrou/ARROW-4187-file-benchmark-windows and squashes the following commits:
46defe6af <Antoine Pitrou> ARROW-4187: Enable file-benchmark on Windows
---
cpp/src/arrow/compute/kernels/take-benchmark.cc | 9 ++-
cpp/src/arrow/io/file-benchmark.cc | 87 +++++++++++++++++++++++--
2 files changed, 86 insertions(+), 10 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/take-benchmark.cc b/cpp/src/arrow/compute/kernels/take-benchmark.cc
index 139e183..d28f7af 100644
--- a/cpp/src/arrow/compute/kernels/take-benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/take-benchmark.cc
@@ -48,7 +48,8 @@ static void TakeInt64(benchmark::State& state) {
auto values = rand.Int64(array_size, -100, 100, args.null_proportion);
- auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+ auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+ static_cast<int32_t>(array_size - 1), args.null_proportion);
TakeBenchmark(state, values, indices);
}
@@ -64,7 +65,8 @@ static void TakeFixedSizeList1Int64(benchmark::State& state) {
fixed_size_list(int64(), 1), array_size, int_array, int_array->null_bitmap(),
int_array->null_count());
- auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+ auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+ static_cast<int32_t>(array_size - 1), args.null_proportion);
TakeBenchmark(state, values, indices);
}
@@ -110,7 +112,8 @@ static void TakeString(benchmark::State& state) {
auto values = std::static_pointer_cast<StringArray>(rand.String(
array_size, string_min_length, string_max_length, args.null_proportion));
- auto indices = rand.Int32(array_size, 0, array_size - 1, args.null_proportion);
+ auto indices = rand.Int32(static_cast<int32_t>(array_size), 0,
+ static_cast<int32_t>(array_size - 1), args.null_proportion);
TakeBenchmark(state, values, indices);
}
diff --git a/cpp/src/arrow/io/file-benchmark.cc b/cpp/src/arrow/io/file-benchmark.cc
index 74b92cb..b0880fd 100644
--- a/cpp/src/arrow/io/file-benchmark.cc
+++ b/cpp/src/arrow/io/file-benchmark.cc
@@ -20,6 +20,8 @@
#include "arrow/io/file.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io-util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/windows_compatibility.h"
#include "benchmark/benchmark.h"
@@ -30,7 +32,11 @@
#include <thread>
#include <valarray>
-#ifndef _WIN32
+#ifdef _WIN32
+
+#include <io.h>
+
+#else
#include <fcntl.h>
#include <poll.h>
@@ -40,17 +46,82 @@
namespace arrow {
-#ifndef _WIN32
-
-std::string GetNullFile() { return "/dev/null"; }
+std::string GetNullFile() {
+#ifdef _WIN32
+ return "NUL";
+#else
+ return "/dev/null";
+#endif
+}
const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
const std::valarray<int64_t> large_sizes = {8192, 100000};
constexpr int64_t kBufferSize = 4096;
+#ifdef _WIN32
+
+class BackgroundReader {
+ // A class that reads data in the background from a file descriptor
+ // (Windows implementation)
+
+ public:
+ static std::shared_ptr<BackgroundReader> StartReader(int fd) {
+ std::shared_ptr<BackgroundReader> reader(new BackgroundReader(fd));
+ reader->worker_.reset(new std::thread([=] { reader->LoopReading(); }));
+ return reader;
+ }
+ void Stop() { ARROW_CHECK(SetEvent(event_)); }
+ void Join() { worker_->join(); }
+
+ ~BackgroundReader() {
+ ABORT_NOT_OK(internal::FileClose(fd_));
+ ARROW_CHECK(CloseHandle(event_));
+ }
+
+ protected:
+ explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
+ file_handle_ = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+ ARROW_CHECK_NE(file_handle_, INVALID_HANDLE_VALUE);
+ event_ =
+ CreateEvent(nullptr, /* bManualReset=*/TRUE, /* bInitialState=*/FALSE, nullptr);
+ ARROW_CHECK_NE(event_, INVALID_HANDLE_VALUE);
+ }
+
+ void LoopReading() {
+ const HANDLE handles[] = {file_handle_, event_};
+ while (true) {
+ DWORD ret = WaitForMultipleObjects(2, handles, /* bWaitAll=*/FALSE, INFINITE);
+ ARROW_CHECK_NE(ret, WAIT_FAILED);
+ if (ret == WAIT_OBJECT_0 + 1) {
+ // Got stop request
+ break;
+ } else if (ret == WAIT_OBJECT_0) {
+ // File ready for reading
+ int64_t bytes_read;
+ ARROW_CHECK_OK(internal::FileRead(fd_, buffer_, buffer_size_, &bytes_read));
+ total_bytes_ += bytes_read;
+ } else {
+ ARROW_LOG(FATAL) << "Unexpected WaitForMultipleObjects return value " << ret;
+ }
+ }
+ }
+
+ int fd_;
+ HANDLE file_handle_, event_;
+ int64_t total_bytes_;
+
+ static const int64_t buffer_size_ = 16384;
+ uint8_t buffer_[buffer_size_];
+
+ std::unique_ptr<std::thread> worker_;
+};
+
+#else
+
class BackgroundReader {
// A class that reads data in the background from a file descriptor
+ // (Unix implementation)
public:
static std::shared_ptr<BackgroundReader> StartReader(int fd) {
@@ -116,6 +187,8 @@ class BackgroundReader {
std::unique_ptr<std::thread> worker_;
};
+#endif
+
// Set up a pipe with an OutputStream at one end and a BackgroundReader at
// the other end.
static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
@@ -139,6 +212,9 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
ABORT_NOT_OK(stream->Write(data, size));
}
}
+ // For Windows: need to close writer before joining reader thread.
+ ABORT_NOT_OK(stream->Close());
+
const int64_t total_bytes = static_cast<int64_t>(state.iterations()) * sum_sizes;
state.SetBytesProcessed(total_bytes);
@@ -147,7 +223,6 @@ static void BenchmarkStreamingWrites(benchmark::State& state,
reader->Stop();
reader->Join();
}
- ABORT_NOT_OK(stream->Close());
}
// Benchmark writing to /dev/null
@@ -232,6 +307,4 @@ BENCHMARK(BufferedOutputStreamSmallWritesToNull)->UseRealTime();
BENCHMARK(BufferedOutputStreamSmallWritesToPipe)->UseRealTime();
BENCHMARK(BufferedOutputStreamLargeWritesToPipe)->UseRealTime();
-#endif // ifndef _WIN32
-
} // namespace arrow