You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/10/22 22:03:43 UTC

[GitHub] huijunwu closed pull request #3052: add back stmgr metrics

huijunwu closed pull request #3052: add back stmgr metrics
URL: https://github.com/apache/incubator-heron/pull/3052
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 7ba3dfa576..213c35f48d 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -188,20 +188,45 @@ void StMgrClient::SendHelloRequest() {
 }
 
 bool StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage& _msg) {
-  if (!IsConnected()) {
+  proto::system::HeronTupleSet2* tuple_set = nullptr;
+  tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+  tuple_set->ParsePartialFromString(_msg.set());
+
+  if (!IsConnected() || (droptuples_upon_backpressure_ && HasCausedBackPressure())) {
+    stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS_LOST)->incr_by(_msg.ByteSize());
+    if (tuple_set->has_data()) {
+      stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->control().acks_size());
+      stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS_LOST)
+          ->incr_by(tuple_set->control().fails_size());
+    }
+
     if (++ndropped_messages_ % 100 == 0) {
-      LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
-                << other_stmgr_id_ << " because it is not connected";
+      if (!IsConnected()) {
+        LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+                  << other_stmgr_id_ << " because it is not connected";
+      } else {
+        LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+                        << other_stmgr_id_ << " because it is causing backpressure and "
+                        << "droptuples_upon_backpressure is set";
       }
-    return false;
-  } else if (droptuples_upon_backpressure_ && HasCausedBackPressure()) {
-    if (++ndropped_messages_ % 100 == 0) {
-      LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
-                << other_stmgr_id_ << " because it is causing backpressure and "
-                << "droptuples_upon_backpressure is set";
     }
     return false;
   } else {
+    stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS)->incr_by(_msg.ByteSize());
+    if (tuple_set->has_data()) {
+      stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->control().acks_size());
+      stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS)
+          ->incr_by(tuple_set->control().fails_size());
+    }
+
     SendMessage(_msg);
     return true;
   }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 8a5a69888b..889dfbab8b 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -30,10 +30,13 @@
 #include "network/network.h"
 #include "config/helper.h"
 #include "config/heron-internals-config-reader.h"
+#include "metrics/metrics.h"
 
 namespace heron {
 namespace stmgr {
 
+// The scope the metrics in this file are under
+const sp_string SERVER_SCOPE = "__server/";
 // Num data tuples received from other stream managers
 const sp_string METRIC_DATA_TUPLES_FROM_STMGRS = "__tuples_from_stmgrs";
 // Num ack tuples received from other stream managers
@@ -45,22 +48,48 @@ const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
 
 StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options,
                          const sp_string& _topology_name, const sp_string& _topology_id,
-                         const sp_string& _stmgr_id, StMgr* _stmgr)
+                         const sp_string& _stmgr_id, StMgr* _stmgr,
+                         heron::common::MetricsMgrSt* _metrics_manager_client)
     : Server(eventLoop, _options),
       topology_name_(_topology_name),
       topology_id_(_topology_id),
       stmgr_id_(_stmgr_id),
-      stmgr_(_stmgr) {
+      stmgr_(_stmgr),
+      metrics_manager_client_(_metrics_manager_client) {
   // stmgr related handlers
   InstallRequestHandler(&StMgrServer::HandleStMgrHelloRequest);
   InstallMessageHandler(&StMgrServer::HandleTupleStreamMessage);
   InstallMessageHandler(&StMgrServer::HandleStartBackPressureMessage);
   InstallMessageHandler(&StMgrServer::HandleStopBackPressureMessage);
   InstallMessageHandler(&StMgrServer::HandleDownstreamStatefulCheckpointMessage);
+
+  // The metrics need to be registered one by one here because the "__server" scope
+  // is already registered in heron::stmgr::InstanceServer. Duplicated registrations
+  // will only have one successfully registered.
+  tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS,
+                                           tuples_from_stmgrs_metrics_);
+  ack_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS,
+                                           ack_tuples_from_stmgrs_metrics_);
+  fail_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS,
+                                           fail_tuples_from_stmgrs_metrics_);
+  bytes_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
+                                           bytes_from_stmgrs_metrics_);
 }
 
 StMgrServer::~StMgrServer() {
   Stop();
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS);
+  delete tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
+  delete ack_tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
+  delete fail_tuples_from_stmgrs_metrics_;
+  metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
+  delete bytes_from_stmgrs_metrics_;
 }
 
 void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -135,6 +164,18 @@ void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
     LOG(INFO) << "Recieved Tuple messages from unknown streammanager connection";
     __global_protobuf_pool_release__(_message);
   } else {
+    proto::system::HeronTupleSet2* tuple_set = nullptr;
+    tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+    tuple_set->ParsePartialFromString(_message->set());
+
+    bytes_from_stmgrs_metrics_->incr_by(_message->ByteSize());
+    if (tuple_set->has_data()) {
+      tuples_from_stmgrs_metrics_->incr_by(tuple_set->data().tuples_size());
+    } else if (tuple_set->has_control()) {
+      ack_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().acks_size());
+      fail_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().fails_size());
+    }
+
     stmgr_->HandleStreamManagerData(iter->second, _message);
   }
 }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index ef5b609250..4ba28f47ab 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,6 +31,8 @@
 
 namespace heron {
 namespace common {
+class MetricsMgrSt;
+class CountMetric;
 }
 }
 
@@ -42,7 +44,8 @@ class StMgr;
 class StMgrServer : public Server {
  public:
   StMgrServer(EventLoop* eventLoop, const NetworkOptions& options, const sp_string& _topology_name,
-              const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr);
+              const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr,
+              heron::common::MetricsMgrSt* _metrics_manager_client);
   virtual ~StMgrServer();
 
   // Do back pressure
@@ -93,6 +96,13 @@ class StMgrServer : public Server {
   sp_string topology_id_;
   sp_string stmgr_id_;
   StMgr* stmgr_;
+
+  // Metrics
+  heron::common::MetricsMgrSt* metrics_manager_client_;
+  heron::common::CountMetric* tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* ack_tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* fail_tuples_from_stmgrs_metrics_;
+  heron::common::CountMetric* bytes_from_stmgrs_metrics_;
 };
 
 }  // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 973199ca2f..d99071376f 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -288,7 +288,8 @@ void StMgr::StartStmgrServer() {
       1_MB);
   sops.set_high_watermark(high_watermark_);
   sops.set_low_watermark(low_watermark_);
-  stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this);
+  stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this,
+                                  metrics_manager_client_);
 
   // start the server
   CHECK_EQ(stmgr_server_->Start(), 0);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services