You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2020/03/04 09:35:11 UTC
[kudu] 01/02: subprocess: plumb Java metrics into C++
This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4176b1ad00c3fa01b808bbdd58f08f0aa4ab471
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Mar 2 21:18:41 2020 -0800
subprocess: plumb Java metrics into C++
This plumbs metrics from the Java subprocess into C++ and encapsulates
common code used to interact with a SubprocessServer bits (including the
metrics parsing) into the new SubprocessProxy template class.
This template is specialized for Echo{Request,Response}PB messages as
the new test-only EchoSubprocess, and adds echo-specific histogram
metrics based on those returned by the Echo Java subprocess.
Change-Id: I7260ea13717dfd4af0138f77dfb6e5d239b3bee2
Reviewed-on: http://gerrit.cloudera.org:8080/15344
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <ha...@cloudera.com>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/subprocess/CMakeLists.txt | 1 +
src/kudu/subprocess/subprocess_proxy-test.cc | 167 +++++++++++++++++++++++++++
src/kudu/subprocess/subprocess_proxy.h | 102 ++++++++++++++++
3 files changed, 270 insertions(+)
diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt
index 62f277c..8f284f5 100644
--- a/src/kudu/subprocess/CMakeLists.txt
+++ b/src/kudu/subprocess/CMakeLists.txt
@@ -72,4 +72,5 @@ if (NOT NO_TESTS)
)
endif()
+ADD_KUDU_TEST(subprocess_proxy-test)
ADD_KUDU_TEST(subprocess_server-test)
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
new file mode 100644
index 0000000..6d26268
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/subprocess/subprocess_proxy.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::make_shared;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
+ "Echo subprocess inbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Echo subprocess' inbound request queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+ "Echo subprocess outbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Echo subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
+ "Echo subprocess inbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Echo subprocess' inbound request queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
+ "Echo subprocess outbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Echo subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+ "Echo subprocess execution time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent executing the Echo subprocess request, excluding "
+ "time spent spent in the subprocess queues",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+
+
+namespace kudu {
+namespace subprocess {
+
+
+#define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0)
+#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
+struct EchoSubprocessMetrics : public SubprocessMetrics {
+ explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+ HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
+ HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
+ HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+ HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+ HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+ }
+};
+#undef HISTINIT
+#undef MINIT
+
+typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess;
+
+class EchoSubprocessTest : public KuduTest {
+ public:
+ EchoSubprocessTest()
+ : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+ "subprocess_proxy-test")) {}
+
+ void SetUp() override {
+ KuduTest::SetUp();
+ ASSERT_OK(ResetEchoSubprocess());
+ }
+
+ Status ResetEchoSubprocess() {
+ string exe;
+ RETURN_NOT_OK(env_->GetExecutablePath(&exe));
+ const string bin_dir = DirName(exe);
+ string java_home;
+ RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
+ vector<string> argv = {
+ Substitute("$0/bin/java", java_home),
+ "-jar", Substitute("$0/kudu-subprocess-echo.jar", bin_dir)
+ };
+ echo_subprocess_ = make_shared<EchoSubprocess>(std::move(argv), metric_entity_);
+ return echo_subprocess_->Start();
+ }
+
+ protected:
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ shared_ptr<EchoSubprocess> echo_subprocess_;
+};
+
+TEST_F(EchoSubprocessTest, TestBasicMetrics) {
+ const string kMessage = "don't catch you slippin' now";
+ const int64_t kSleepMs = 1000;
+ EchoRequestPB req;
+ req.set_data(kMessage);
+ req.set_sleep_ms(kSleepMs);
+ EchoResponsePB resp;
+ ASSERT_OK(echo_subprocess_->Execute(req, &resp));
+ ASSERT_EQ(kMessage, resp.data());
+
+ // There shouldn't have anything in the subprocess queues.
+ Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+ METRIC_echo_subprocess_inbound_queue_length).get());
+ ASSERT_EQ(1, in_len_hist->TotalCount());
+ ASSERT_EQ(0, in_len_hist->MaxValueForTests());
+ Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+ METRIC_echo_subprocess_outbound_queue_length).get());
+ ASSERT_EQ(1, out_len_hist->TotalCount());
+ ASSERT_EQ(0, out_len_hist->MaxValueForTests());
+
+ // We should have some non-negative queue times.
+ Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+ METRIC_echo_subprocess_outbound_queue_time_ms).get());
+ ASSERT_EQ(1, out_hist->TotalCount());
+ ASSERT_LE(0, out_hist->MaxValueForTests());
+ Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+ METRIC_echo_subprocess_inbound_queue_time_ms).get());
+ ASSERT_EQ(1, in_hist->TotalCount());
+ ASSERT_LE(0, in_hist->MaxValueForTests());
+
+ // The execution should've taken at least our sleep time.
+ Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+ METRIC_echo_subprocess_execution_time_ms).get());
+ ASSERT_EQ(1, exec_hist->TotalCount());
+ ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+}
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
new file mode 100644
index 0000000..3d60311
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace subprocess {
+
+// TODO(awong): add server metrics.
+struct SubprocessMetrics {
+ scoped_refptr<Histogram> inbound_queue_length;
+ scoped_refptr<Histogram> outbound_queue_length;
+ scoped_refptr<Histogram> inbound_queue_time_ms;
+ scoped_refptr<Histogram> outbound_queue_time_ms;
+ scoped_refptr<Histogram> execution_time_ms;
+};
+
+// Template that wraps a SubprocessServer, exposing only the underlying ReqPB
+// and RespPB as an interface. The given MetricsPB will be initialized,
+// allowing for metrics specific to each specialized SubprocessServer.
+template<class ReqPB, class RespPB, class MetricsPB>
+class SubprocessProxy {
+ public:
+ SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity)
+ : server_(std::move(argv)), metrics_(entity) {}
+
+ // Starts the underlying subprocess.
+ Status Start() {
+ return server_.Init();
+ }
+
+ // Executes the given request and populates the given response, returning a
+ // non-OK Status if there was an error sending the request (e.g. timed out)
+ // or if there was an error in the response.
+ Status Execute(const ReqPB& req, RespPB* resp) {
+ SubprocessRequestPB sreq;
+ sreq.mutable_request()->PackFrom(req);
+ SubprocessResponsePB sresp;
+ RETURN_NOT_OK(server_.Execute(&sreq, &sresp));
+ if (!sresp.response().UnpackTo(resp)) {
+ LOG(ERROR) << strings::Substitute("unable to unpack response: $0",
+ pb_util::SecureDebugString(sresp));
+ return Status::Corruption("unable to unpack response");
+ }
+ // The subprocess metrics should still be valid regardless of whether there
+ // was an error, so parse them first.
+ if (sresp.has_metrics()) {
+ ParseMetricsPB(sresp.metrics());
+ }
+ if (sresp.has_error()) {
+ return StatusFromPB(sresp.error());
+ }
+ return Status::OK();
+ }
+ private:
+ // Parses the given metrics protobuf and updates 'metrics_' based on its
+ // contents.
+ void ParseMetricsPB(const SubprocessMetricsPB& pb) {
+ DCHECK(pb.has_inbound_queue_length());
+ DCHECK(pb.has_outbound_queue_length());
+ DCHECK(pb.has_inbound_queue_time_ms());
+ DCHECK(pb.has_outbound_queue_time_ms());
+ DCHECK(pb.has_execution_time_ms());
+ metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
+ metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
+ metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+ metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+ metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+ }
+
+ SubprocessServer server_;
+ MetricsPB metrics_;
+};
+
+} // namespace subprocess
+} // namespace kudu