You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/10/22 22:03:47 UTC
[incubator-heron] branch master updated: add back stmgr metrics
(#3052)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 50de70b add back stmgr metrics (#3052)
50de70b is described below
commit 50de70b20c8671a7b752ab28ad9bfcf2bdd2ed5c
Author: Yao Li <cl...@gmail.com>
AuthorDate: Mon Oct 22 14:55:04 2018 -0700
add back stmgr metrics (#3052)
* add back stmgr metrics
* refactor and add comments on stmgr-server metrics registration
---
heron/stmgr/src/cpp/manager/stmgr-client.cpp | 43 ++++++++++++++++++++------
heron/stmgr/src/cpp/manager/stmgr-server.cpp | 45 ++++++++++++++++++++++++++--
heron/stmgr/src/cpp/manager/stmgr-server.h | 12 +++++++-
heron/stmgr/src/cpp/manager/stmgr.cpp | 3 +-
4 files changed, 90 insertions(+), 13 deletions(-)
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 7ba3dfa..213c35f 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -188,20 +188,45 @@ void StMgrClient::SendHelloRequest() {
}
bool StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage& _msg) {
- if (!IsConnected()) {
+ proto::system::HeronTupleSet2* tuple_set = nullptr;
+ tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+ tuple_set->ParsePartialFromString(_msg.set());
+
+ if (!IsConnected() || (droptuples_upon_backpressure_ && HasCausedBackPressure())) {
+ stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS_LOST)->incr_by(_msg.ByteSize());
+ if (tuple_set->has_data()) {
+ stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS_LOST)
+ ->incr_by(tuple_set->data().tuples_size());
+ } else if (tuple_set->has_control()) {
+ stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS_LOST)
+ ->incr_by(tuple_set->control().acks_size());
+ stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS_LOST)
+ ->incr_by(tuple_set->control().fails_size());
+ }
+
if (++ndropped_messages_ % 100 == 0) {
- LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
- << other_stmgr_id_ << " because it is not connected";
+ if (!IsConnected()) {
+ LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+ << other_stmgr_id_ << " because it is not connected";
+ } else {
+ LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
+ << other_stmgr_id_ << " because it is causing backpressure and "
+ << "droptuples_upon_backpressure is set";
}
- return false;
- } else if (droptuples_upon_backpressure_ && HasCausedBackPressure()) {
- if (++ndropped_messages_ % 100 == 0) {
- LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
- << other_stmgr_id_ << " because it is causing backpressure and "
- << "droptuples_upon_backpressure is set";
}
return false;
} else {
+ stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS)->incr_by(_msg.ByteSize());
+ if (tuple_set->has_data()) {
+ stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS)
+ ->incr_by(tuple_set->data().tuples_size());
+ } else if (tuple_set->has_control()) {
+ stmgr_client_metrics_->scope(METRIC_ACK_TUPLES_TO_STMGRS)
+ ->incr_by(tuple_set->control().acks_size());
+ stmgr_client_metrics_->scope(METRIC_FAIL_TUPLES_TO_STMGRS)
+ ->incr_by(tuple_set->control().fails_size());
+ }
+
SendMessage(_msg);
return true;
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 8a5a698..889dfba 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -30,10 +30,13 @@
#include "network/network.h"
#include "config/helper.h"
#include "config/heron-internals-config-reader.h"
+#include "metrics/metrics.h"
namespace heron {
namespace stmgr {
+// The scope the metrics in this file are under
+const sp_string SERVER_SCOPE = "__server/";
// Num data tuples received from other stream managers
const sp_string METRIC_DATA_TUPLES_FROM_STMGRS = "__tuples_from_stmgrs";
// Num ack tuples received from other stream managers
@@ -45,22 +48,48 @@ const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
- const sp_string& _stmgr_id, StMgr* _stmgr)
+ const sp_string& _stmgr_id, StMgr* _stmgr,
+ heron::common::MetricsMgrSt* _metrics_manager_client)
: Server(eventLoop, _options),
topology_name_(_topology_name),
topology_id_(_topology_id),
stmgr_id_(_stmgr_id),
- stmgr_(_stmgr) {
+ stmgr_(_stmgr),
+ metrics_manager_client_(_metrics_manager_client) {
// stmgr related handlers
InstallRequestHandler(&StMgrServer::HandleStMgrHelloRequest);
InstallMessageHandler(&StMgrServer::HandleTupleStreamMessage);
InstallMessageHandler(&StMgrServer::HandleStartBackPressureMessage);
InstallMessageHandler(&StMgrServer::HandleStopBackPressureMessage);
InstallMessageHandler(&StMgrServer::HandleDownstreamStatefulCheckpointMessage);
+
+ // The metrics need to be registered one by one here because the "__server" scope
+ // is already registered in heron::stmgr::InstanceServer. Duplicated registrations
+ // will only have one successfully registered.
+ tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS,
+ tuples_from_stmgrs_metrics_);
+ ack_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS,
+ ack_tuples_from_stmgrs_metrics_);
+ fail_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS,
+ fail_tuples_from_stmgrs_metrics_);
+ bytes_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
+ bytes_from_stmgrs_metrics_);
}
StMgrServer::~StMgrServer() {
Stop();
+ metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS);
+ delete tuples_from_stmgrs_metrics_;
+ metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
+ delete ack_tuples_from_stmgrs_metrics_;
+ metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
+ delete fail_tuples_from_stmgrs_metrics_;
+ metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
+ delete bytes_from_stmgrs_metrics_;
}
void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -135,6 +164,18 @@ void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
LOG(INFO) << "Recieved Tuple messages from unknown streammanager connection";
__global_protobuf_pool_release__(_message);
} else {
+ proto::system::HeronTupleSet2* tuple_set = nullptr;
+ tuple_set = __global_protobuf_pool_acquire__(tuple_set);
+ tuple_set->ParsePartialFromString(_message->set());
+
+ bytes_from_stmgrs_metrics_->incr_by(_message->ByteSize());
+ if (tuple_set->has_data()) {
+ tuples_from_stmgrs_metrics_->incr_by(tuple_set->data().tuples_size());
+ } else if (tuple_set->has_control()) {
+ ack_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().acks_size());
+ fail_tuples_from_stmgrs_metrics_->incr_by(tuple_set->control().fails_size());
+ }
+
stmgr_->HandleStreamManagerData(iter->second, _message);
}
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index ef5b609..4ba28f4 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,6 +31,8 @@
namespace heron {
namespace common {
+class MetricsMgrSt;
+class CountMetric;
}
}
@@ -42,7 +44,8 @@ class StMgr;
class StMgrServer : public Server {
public:
StMgrServer(EventLoop* eventLoop, const NetworkOptions& options, const sp_string& _topology_name,
- const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr);
+ const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stmgr,
+ heron::common::MetricsMgrSt* _metrics_manager_client);
virtual ~StMgrServer();
// Do back pressure
@@ -93,6 +96,13 @@ class StMgrServer : public Server {
sp_string topology_id_;
sp_string stmgr_id_;
StMgr* stmgr_;
+
+ // Metrics
+ heron::common::MetricsMgrSt* metrics_manager_client_;
+ heron::common::CountMetric* tuples_from_stmgrs_metrics_;
+ heron::common::CountMetric* ack_tuples_from_stmgrs_metrics_;
+ heron::common::CountMetric* fail_tuples_from_stmgrs_metrics_;
+ heron::common::CountMetric* bytes_from_stmgrs_metrics_;
};
} // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 973199c..d990713 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -288,7 +288,8 @@ void StMgr::StartStmgrServer() {
1_MB);
sops.set_high_watermark(high_watermark_);
sops.set_low_watermark(low_watermark_);
- stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this);
+ stmgr_server_ = new StMgrServer(eventLoop_, sops, topology_name_, topology_id_, stmgr_id_, this,
+ metrics_manager_client_);
// start the server
CHECK_EQ(stmgr_server_->Start(), 0);