You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/10/22 22:03:47 UTC

[incubator-heron] branch master updated: add back stmgr metrics (#3052)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 50de70b  add back stmgr metrics (#3052)
50de70b is described below

commit 50de70b20c8671a7b752ab28ad9bfcf2bdd2ed5c
Author: Yao Li <cl...@gmail.com>
AuthorDate: Mon Oct 22 14:55:04 2018 -0700

    add back stmgr metrics (#3052)
    
    * add back stmgr metrics
    
    * refactor and add comments on stmgr-server metrics registration
---
 heron/stmgr/src/cpp/manager/stmgr-client.cpp | 43 ++++++++++++++++++++------
 heron/stmgr/src/cpp/manager/stmgr-server.cpp | 45 ++++++++++++++++++++++++++--
 heron/stmgr/src/cpp/manager/stmgr-server.h   | 12 +++++++-
 heron/stmgr/src/cpp/manager/stmgr.cpp        |  3 +-
 4 files changed, 90 insertions(+), 13 deletions(-)

diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 7ba3dfa..213c35f 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -188,20 +188,45 @@ void StMgrClient::SendHelloRequest() {
 }
 
 bool StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage& _msg) {
-  if (!IsConnected()) {
+  proto::system::HeronTupleSet2* tuple_set = nullptr;
+  tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+  tuple_set->ParsePartialFromString(_msg.set());
+
+  if (!IsConnected() || (droptuples_upon_backpressure_ && HasCausedBackPressure())) {
+    stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS_LOST)->incr_by(_msg.ByteSize());
+    if (tuple_set->has_data()) {
+      stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->control().acks_size());
+      stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->control().fails_size());
+    }
+
     if (++ndropped_messages_ % 100 == 0) {
-      LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
-                << other_stmgr_id_ << " because it is not connected";
+      if (!IsConnected()) {
+        LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+                  << other_stmgr_id_ << " because it is not connected";
+      } else {
+        LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+                        << other_stmgr_id_ << " because it is causing backpressure and "
+                        << "droptuples_upon_backpressure is set";
       }
-    return false;
-  } else if (droptuples_upon_backpressure_ && HasCausedBackPressure()) {
-    if (++ndropped_messages_ % 100 == 0) {
-      LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
-                << other_stmgr_id_ << " because it is causing backpressure and "
-                << "droptuples_upon_backpressure is set";
     }
     return false;
   } else {
+    stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS)->incr_by(_msg.ByteSize());
+    if (tuple_set->has_data()) {
+      stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->control().acks_size());
+      stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->control().fails_size());
+    }
+
     SendMessage(_msg);
     return true;
   }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 8a5a698..889dfba 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -30,10 +30,13 @@
 #include "network/network.h"
 #include "config/helper.h"
 #include "config/heron-internals-config-reader.h"
+#include "metrics/metrics.h"
 
 namespace heron {
 namespace stmgr {
 
+// The scope the metrics in this file are under
+const sp_string SERVER_SCOPE = "__server/";
 // Num data tuples received from other stream managers
 const sp_string METRIC_DATA_TUPLES_FROM_STMGRS = "__tuples_from_stmgrs";
 // Num ack tuples received from other stream managers
@@ -45,22 +48,48 @@ const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
 
 StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options,
                          const sp_string& _topology_name, const sp_string& _topology_id,
-                         const sp_string& _stmgr_id, StMgr* _stmgr)
+                         const sp_string& _stmgr_id, StMgr* _stmgr,
+                         heron::common::MetricsMgrSt* _metrics_manager_client)
     : Server(eventLoop, _options),
       topology_name_(_topology_name),
       topology_id_(_topology_id),
       stmgr_id_(_stmgr_id),
-      stmgr_(_stmgr) {
+      stmgr_(_stmgr),
+      metrics_manager_client_(_metrics_manager_client) {
   // stmgr related handlers
   InstallRequestHandler(&StMgrServer::HandleStMgrHelloRequest);
   InstallMessageHandler(&StMgrServer::HandleTupleStreamMessage);
   InstallMessageHandler(&StMgrServer::HandleStartBackPressureMessage);
   InstallMessageHandler(&StMgrServer::HandleStopBackPressureMessage);
   InstallMessageHandler(&StMgrServer::HandleDownstreamStatefulCheckpointMessage);
+
+  // The metrics need to be registered one by one here because the "__server" scope
+  // is already registered in heron::stmgr::InstanceServer. Duplicated registrations
+  // will only have one successfully registered.
+  tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS,
+                                           tuples_from_stmgrs_metrics_);
+  ack_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS,
+                                           ack_tuples_from_stmgrs_metrics_);
+  fail_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS,
+                                           fail_tuples_from_stmgrs_metrics_);
+  bytes_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
+                                           bytes_from_stmgrs_metrics_);
 }
 
 StMgrServer::~StMgrServer() {
   Stop();
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS);
+  delete tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
+  delete ack_tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
+  delete fail_tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
+  delete bytes_from_stmgrs_metrics_;
 }
 
 void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -135,6 +164,18 @@ void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
     LOG(INFO) << "Recieved Tuple messages from unknown streammanager connection";
     __global_protobuf_pool_release__(_message);
   } else {
+    proto::system::HeronTupleSet2* tuple_set = nullptr;
+    tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+    tuple_set->ParsePartialFromString(_message->set());
+
+    bytes_from_stmgrs_metrics_->incr_by(_message->ByteSize());
+    if (tuple_set->has_data()) {
+      tuples_from_stmgrs_metrics_->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      ack_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().acks_size());
+      fail_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().fails_size());
+    }
+
     stmgr_->HandleStreamManagerData(iter->second, _message);
   }
 }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index ef5b609..4ba28f4 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,6 +31,8 @@
 
 namespace heron {
 namespace common {
+class MetricsMgrSt;
+class CountMetric;
 }
 }
 
@@ -42,7 +44,8 @@ class StMgr;
 class StMgrServer : public Server {
  public:
   StMgrServer(EventLoop* eventLoop, const NetworkOptions& options, const sp_string& _topology_name,
-              const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr);
+              const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr,
+              heron::common::MetricsMgrSt* _metrics_manager_client);
   virtual ~StMgrServer();
 
   // Do back pressure
@@ -93,6 +96,13 @@ class StMgrServer : public Server {
   sp_string topology_id_;
   sp_string stmgr_id_;
   StMgr* stmgr_;
+
+  // Metrics
+  heron::common::MetricsMgrSt* metrics_manager_client_;
+  heron::common::CountMetric* tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* ack_tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* fail_tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* bytes_from_stmgrs_metrics_;
 };
 
 }  // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 973199c..d990713 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -288,7 +288,8 @@ void StMgr::StartStmgrServer() {
       1_MB);
   sops.set_high_watermark(high_watermark_);
   sops.set_low_watermark(low_watermark_);
-  stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this);
+  stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this,
+                                  metrics_manager_client_);
 
   // start the server
   CHECK_EQ(stmgr_server_->Start(), 0);