You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2020/03/04 09:35:11 UTC

[kudu] 01/02: subprocess: plumb Java metrics into C++

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4176b1ad00c3fa01b808bbdd58f08f0aa4ab471
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Mar 2 21:18:41 2020 -0800

    subprocess: plumb Java metrics into C++
    
    This plumbs metrics from the Java subprocess into C++ and encapsulates
    common code used to interact with a SubprocessServer bits (including the
    metrics parsing) into the new SubprocessProxy template class.
    
    This template is specialized for Echo{Request,Response}PB messages as
    the new test-only EchoSubprocess, and adds echo-specific histogram
    metrics based on those returned by the Echo Java subprocess.
    
    Change-Id: I7260ea13717dfd4af0138f77dfb6e5d239b3bee2
    Reviewed-on: http://gerrit.cloudera.org:8080/15344
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/subprocess/CMakeLists.txt           |   1 +
 src/kudu/subprocess/subprocess_proxy-test.cc | 167 +++++++++++++++++++++++++++
 src/kudu/subprocess/subprocess_proxy.h       | 102 ++++++++++++++++
 3 files changed, 270 insertions(+)

diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt
index 62f277c..8f284f5 100644
--- a/src/kudu/subprocess/CMakeLists.txt
+++ b/src/kudu/subprocess/CMakeLists.txt
@@ -72,4 +72,5 @@ if (NOT NO_TESTS)
   )
 endif()
 
+ADD_KUDU_TEST(subprocess_proxy-test)
 ADD_KUDU_TEST(subprocess_server-test)
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
new file mode 100644
index 0000000..6d26268
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/subprocess/subprocess_proxy.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::make_shared;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
+    "Echo subprocess inbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+    "Echo subprocess outbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
+    "Echo subprocess inbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
+    "Echo subprocess outbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+    "Echo subprocess execution time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent executing the Echo subprocess request, excluding "
+    "time spent spent in the subprocess queues",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+
+
+namespace kudu {
+namespace subprocess {
+
+
+#define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0)
+#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
+struct EchoSubprocessMetrics : public SubprocessMetrics {
+  explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+    HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
+    HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
+    HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+    HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+    HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+  }
+};
+#undef HISTINIT
+#undef MINIT
+
+typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess;
+
+class EchoSubprocessTest : public KuduTest {
+ public:
+  EchoSubprocessTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                        "subprocess_proxy-test")) {}
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ResetEchoSubprocess());
+  }
+
+  Status ResetEchoSubprocess() {
+    string exe;
+    RETURN_NOT_OK(env_->GetExecutablePath(&exe));
+    const string bin_dir = DirName(exe);
+    string java_home;
+    RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
+    vector<string> argv = {
+      Substitute("$0/bin/java", java_home),
+      "-jar", Substitute("$0/kudu-subprocess-echo.jar", bin_dir)
+    };
+    echo_subprocess_ = make_shared<EchoSubprocess>(std::move(argv), metric_entity_);
+    return echo_subprocess_->Start();
+  }
+
+ protected:
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  shared_ptr<EchoSubprocess> echo_subprocess_;
+};
+
+TEST_F(EchoSubprocessTest, TestBasicMetrics) {
+  const string kMessage = "don't catch you slippin' now";
+  const int64_t kSleepMs = 1000;
+  EchoRequestPB req;
+  req.set_data(kMessage);
+  req.set_sleep_ms(kSleepMs);
+  EchoResponsePB resp;
+  ASSERT_OK(echo_subprocess_->Execute(req, &resp));
+  ASSERT_EQ(kMessage, resp.data());
+
+  // There shouldn't have anything in the subprocess queues.
+  Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_inbound_queue_length).get());
+  ASSERT_EQ(1, in_len_hist->TotalCount());
+  ASSERT_EQ(0, in_len_hist->MaxValueForTests());
+  Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_outbound_queue_length).get());
+  ASSERT_EQ(1, out_len_hist->TotalCount());
+  ASSERT_EQ(0, out_len_hist->MaxValueForTests());
+
+  // We should have some non-negative queue times.
+  Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_outbound_queue_time_ms).get());
+  ASSERT_EQ(1, out_hist->TotalCount());
+  ASSERT_LE(0, out_hist->MaxValueForTests());
+  Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_inbound_queue_time_ms).get());
+  ASSERT_EQ(1, in_hist->TotalCount());
+  ASSERT_LE(0, in_hist->MaxValueForTests());
+
+  // The execution should've taken at least our sleep time.
+  Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_execution_time_ms).get());
+  ASSERT_EQ(1, exec_hist->TotalCount());
+  ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+}
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
new file mode 100644
index 0000000..3d60311
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace subprocess {
+
+// TODO(awong): add server metrics.
+struct SubprocessMetrics {
+  scoped_refptr<Histogram> inbound_queue_length;
+  scoped_refptr<Histogram> outbound_queue_length;
+  scoped_refptr<Histogram> inbound_queue_time_ms;
+  scoped_refptr<Histogram> outbound_queue_time_ms;
+  scoped_refptr<Histogram> execution_time_ms;
+};
+
+// Template that wraps a SubprocessServer, exposing only the underlying ReqPB
+// and RespPB as an interface. The given MetricsPB will be initialized,
+// allowing for metrics specific to each specialized SubprocessServer.
+template<class ReqPB, class RespPB, class MetricsPB>
+class SubprocessProxy {
+ public:
+  SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity)
+      : server_(std::move(argv)), metrics_(entity) {}
+
+  // Starts the underlying subprocess.
+  Status Start() {
+    return server_.Init();
+  }
+
+  // Executes the given request and populates the given response, returning a
+  // non-OK Status if there was an error sending the request (e.g. timed out)
+  // or if there was an error in the response.
+  Status Execute(const ReqPB& req, RespPB* resp) {
+    SubprocessRequestPB sreq;
+    sreq.mutable_request()->PackFrom(req);
+    SubprocessResponsePB sresp;
+    RETURN_NOT_OK(server_.Execute(&sreq, &sresp));
+    if (!sresp.response().UnpackTo(resp)) {
+      LOG(ERROR) << strings::Substitute("unable to unpack response: $0",
+                                        pb_util::SecureDebugString(sresp));
+      return Status::Corruption("unable to unpack response");
+    }
+    // The subprocess metrics should still be valid regardless of whether there
+    // was an error, so parse them first.
+    if (sresp.has_metrics()) {
+      ParseMetricsPB(sresp.metrics());
+    }
+    if (sresp.has_error()) {
+      return StatusFromPB(sresp.error());
+    }
+    return Status::OK();
+  }
+ private:
+  // Parses the given metrics protobuf and updates 'metrics_' based on its
+  // contents.
+  void ParseMetricsPB(const SubprocessMetricsPB& pb) {
+    DCHECK(pb.has_inbound_queue_length());
+    DCHECK(pb.has_outbound_queue_length());
+    DCHECK(pb.has_inbound_queue_time_ms());
+    DCHECK(pb.has_outbound_queue_time_ms());
+    DCHECK(pb.has_execution_time_ms());
+    metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
+    metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
+    metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+    metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+    metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+  }
+
+  SubprocessServer server_;
+  MetricsPB metrics_;
+};
+
+} // namespace subprocess
+} // namespace kudu