You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/06/29 17:48:15 UTC
[incubator-heron] branch master updated: add tmaster physical plan
endpoint (#2941)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 50a8484 add tmaster physical plan endpoint (#2941)
50a8484 is described below
commit 50a8484f555a8459205d05c98a940898507e8927
Author: Yao Li <cl...@gmail.com>
AuthorDate: Fri Jun 29 10:48:11 2018 -0700
add tmaster physical plan endpoint (#2941)
* add tmaster pplan endpoint
* fix http_server->InstallCallBack
* fix request deletion
---
heron/tmaster/src/cpp/BUILD | 1 +
heron/tmaster/src/cpp/manager/tcontroller.cpp | 38 +++++++++++++++++++++++++++
heron/tmaster/src/cpp/manager/tcontroller.h | 1 +
3 files changed, 40 insertions(+)
diff --git a/heron/tmaster/src/cpp/BUILD b/heron/tmaster/src/cpp/BUILD
index 91cf0b5..e337034 100644
--- a/heron/tmaster/src/cpp/BUILD
+++ b/heron/tmaster/src/cpp/BUILD
@@ -51,6 +51,7 @@ cc_library(
"//heron/statemgrs/src/cpp:statemgrs-cxx",
"//heron/proto:proto-cxx",
"@com_github_jbeder_yaml_cpp//:yaml-cxx",
+ "@com_github_cereal//:cereal-cxx",
],
linkstatic = 1,
)
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index 1afabc0..b7a4ee6 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -26,6 +26,7 @@
#include <vector>
#include "basics/basics.h"
#include "basics/strutils.h"
+#include "cereal/external/base64.hpp"
#include "config/topology-config-helper.h"
#include "errors/errors.h"
#include "manager/tmaster.h"
@@ -65,6 +66,12 @@ TController::TController(EventLoop* eventLoop, const NetworkOptions& options, TM
this->HandleUpdateRuntimeConfigRequest(request);
};
http_server_->InstallCallBack("/runtime_config/update", std::move(cbUpdateRuntimeConfg));
+
+ // Get current physical plan
+ auto cbGetCurPPlan = [this](IncomingHTTPRequest* request) {
+ this->HandleGetCurPPlanRequest(request);
+ };
+ http_server_->InstallCallBack("/get_current_physical_plan", std::move(cbGetCurPPlan));
}
TController::~TController() { delete http_server_; }
@@ -262,6 +269,37 @@ void TController::HandleUpdateRuntimeConfigRequestDone(IncomingHTTPRequest* requ
delete request;
}
+void TController::HandleGetCurPPlanRequest(IncomingHTTPRequest* request) {
+ LOG(INFO) << "Got a GetCurPPlan request from " << request->GetRemoteHost() << ":"
+ << request->GetRemotePort();
+
+ // make sure all the stream managers are alive, in case that when container is fail,
+ // physical plan is still available at TMaster but not a valid one.
+ if (tmaster_->GetStmgrsRegSummary()->absent_stmgrs_size() != 0) {
+ http_server_->SendErrorReply(request, 400);
+ delete request;
+ return;
+ }
+
+ if (tmaster_->getPhysicalPlan() == NULL) {
+ http_server_->SendErrorReply(request, 400);
+ } else {
+ std::string pplanString;
+ tmaster_->getPhysicalPlan()->SerializeToString(&pplanString);
+
+ // SerializeToString() returns object in binary format which needs to be encoded
+ const unsigned char * encodeString = (unsigned char *)pplanString.c_str();
+ std::string pplanStringFixed = cereal::base64::encode(encodeString, pplanString.size());
+
+ const std::string message("Get current physical plan");
+ LOG(INFO) << message;
+ OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ response->AddResponse(pplanStringFixed);
+ http_server_->SendReply(request, 200, response);
+ }
+ delete request;
+}
+
/*
* Validate topology.
* - topology id matches
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h b/heron/tmaster/src/cpp/manager/tcontroller.h
index beb8fb7..f7e7a48 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmaster/src/cpp/manager/tcontroller.h
@@ -66,6 +66,7 @@ class TController {
void HandleUpdateRuntimeConfigRequest(IncomingHTTPRequest* request);
void HandleUpdateRuntimeConfigRequestDone(IncomingHTTPRequest* request,
proto::system::StatusCode);
+ void HandleGetCurPPlanRequest(IncomingHTTPRequest* request);
// We are a http server
HTTPServer* http_server_;