You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/04/04 19:56:27 UTC
[arrow] branch master updated: ARROW-4566: [Flight] Add option to
run Flight benchmark against separate server
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new bee8d6f ARROW-4566: [Flight] Add option to run Flight benchmark against separate server
bee8d6f is described below
commit bee8d6f74a1ab35f6ca9d81f7572e42e83a34f33
Author: David Li <Da...@twosigma.com>
AuthorDate: Thu Apr 4 14:56:19 2019 -0500
ARROW-4566: [Flight] Add option to run Flight benchmark against separate server
Author: David Li <Da...@twosigma.com>
Closes #4103 from lihalite/arrow-4566 and squashes the following commits:
9f28340f5 <David Li> Add option to run Flight benchmark against separate server
---
cpp/src/arrow/flight/flight-benchmark.cc | 37 ++++++++++++++++++++++++--------
cpp/src/arrow/flight/perf-server.cc | 1 +
2 files changed, 29 insertions(+), 9 deletions(-)
diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc
index 775662c..8bc1ce3 100644
--- a/cpp/src/arrow/flight/flight-benchmark.cc
+++ b/cpp/src/arrow/flight/flight-benchmark.cc
@@ -35,6 +35,10 @@
#include "arrow/flight/perf.pb.h"
#include "arrow/flight/test-util.h"
+DEFINE_string(server_host, "",
+ "An existing performance server to benchmark against (leave blank to spawn "
+ "one automatically)");
+DEFINE_int32(server_port, 31337, "The port to connect to");
DEFINE_int32(num_servers, 1, "Number of performance servers to run");
DEFINE_int32(num_streams, 4, "Number of streams for each server");
DEFINE_int32(num_threads, 4, "Number of concurrent gets");
@@ -63,7 +67,7 @@ struct PerformanceStats {
}
};
-Status RunPerformanceTest(const int port) {
+Status RunPerformanceTest(const std::string& hostname, const int port) {
// TODO(wesm): Multiple servers
// std::vector<std::unique_ptr<TestServer>> servers;
@@ -75,7 +79,7 @@ Status RunPerformanceTest(const int port) {
// Construct client and plan the query
std::unique_ptr<FlightClient> client;
- RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+ RETURN_NOT_OK(FlightClient::Connect(hostname, port, &client));
FlightDescriptor descriptor;
descriptor.type = FlightDescriptor::CMD;
@@ -89,10 +93,11 @@ Status RunPerformanceTest(const int port) {
RETURN_NOT_OK(plan->GetSchema(&schema));
PerformanceStats stats;
- auto ConsumeStream = [&stats, &schema, &port](const FlightEndpoint& endpoint) {
+ auto ConsumeStream = [&stats, &schema, &hostname,
+ &port](const FlightEndpoint& endpoint) {
// TODO(wesm): Use location from endpoint, same host/port for now
std::unique_ptr<FlightClient> client;
- RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client));
+ RETURN_NOT_OK(FlightClient::Connect(hostname, port, &client));
perf::Token token;
token.ParseFromString(endpoint.ticket.ticket);
@@ -182,12 +187,26 @@ Status RunPerformanceTest(const int port) {
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
- const int port = 31337;
- arrow::flight::TestServer server("arrow-flight-perf-server", port);
- server.Start();
+ std::unique_ptr<arrow::flight::TestServer> server;
+ std::string hostname = "localhost";
+ if (FLAGS_server_host == "") {
+ std::cout << "Using remote server: false" << std::endl;
+ server.reset(
+ new arrow::flight::TestServer("arrow-flight-perf-server", FLAGS_server_port));
+ server->Start();
+ } else {
+ std::cout << "Using remote server: true" << std::endl;
+ hostname = FLAGS_server_host;
+ }
+
+ std::cout << "Server host: " << hostname << std::endl
+ << "Server port: " << FLAGS_server_port << std::endl;
- arrow::Status s = arrow::flight::RunPerformanceTest(port);
- server.Stop();
+ arrow::Status s = arrow::flight::RunPerformanceTest(hostname, FLAGS_server_port);
+
+ if (server) {
+ server->Stop();
+ }
if (!s.ok()) {
std::cerr << "Failed with error: << " << s.ToString() << std::endl;
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
index 9ae4813..ebee41f 100644
--- a/cpp/src/arrow/flight/perf-server.cc
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -197,6 +197,7 @@ int main(int argc, char** argv) {
ARROW_CHECK_OK(g_server->Init(FLAGS_port));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(g_server->SetShutdownOnSignals({SIGTERM}));
+ std::cout << "Server port: " << FLAGS_port << std::endl;
ARROW_CHECK_OK(g_server->Serve());
return 0;
}