You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/10/20 04:45:13 UTC
incubator-hawq git commit: HAWQ-38. Support HA for libyarn
Repository: incubator-hawq
Updated Branches:
refs/heads/master cf81985f7 -> 413b6647b
HAWQ-38. Support HA for libyarn
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/413b6647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/413b6647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/413b6647
Branch: refs/heads/master
Commit: 413b6647bb0caaf05ad98ebf975a56338316bfe2
Parents: cf81985
Author: Wen Lin <wl...@pivotal.io>
Authored: Tue Oct 20 10:49:59 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Tue Oct 20 10:49:59 2015 +0800
----------------------------------------------------------------------
depends/libyarn/src/CMakeLists.txt | 2 +-
depends/libyarn/src/common/Exception.cpp | 2 +-
depends/libyarn/src/common/Exception.h | 23 +-
depends/libyarn/src/common/XmlConfig.h | 12 +-
.../src/libyarnclient/ApplicationClient.cpp | 298 ++++++++++----
.../src/libyarnclient/ApplicationClient.h | 80 +++-
.../src/libyarnclient/ApplicationMaster.cpp | 173 +++++++--
.../src/libyarnclient/ApplicationMaster.h | 38 +-
.../libyarnserver/ApplicationClientProtocol.cpp | 388 ++++++++++---------
.../libyarnserver/ApplicationMasterProtocol.cpp | 114 +++---
.../libyarnserver/ApplicationMasterProtocol.h | 2 +-
depends/libyarn/src/rpc/RpcChannel.cpp | 4 +-
12 files changed, 727 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index 1ecf6ba..d159216 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
SET(libyarn_VERSION_MAJOR 0)
SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 4)
+SET(libyarn_VERSION_PATCH 5)
SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
SET(libyarn_VERSION_API 1)
SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/Exception.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.cpp b/depends/libyarn/src/common/Exception.cpp
index e0f084e..0a07e07 100644
--- a/depends/libyarn/src/common/Exception.cpp
+++ b/depends/libyarn/src/common/Exception.cpp
@@ -47,7 +47,7 @@ const char * UnsupportedOperationException::ReflexName =
const char * ReplicaNotFoundException::ReflexName =
"org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException";
-const char * NameNodeStandbyException::ReflexName =
+const char * ResourceManagerStandbyException::ReflexName =
"org.apache.hadoop.ipc.StandbyException";
const char * YarnInvalidBlockToken::ReflexName =
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/Exception.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.h b/depends/libyarn/src/common/Exception.h
index a40b185..e804194 100644
--- a/depends/libyarn/src/common/Exception.h
+++ b/depends/libyarn/src/common/Exception.h
@@ -4,8 +4,8 @@
*
* Author: Zhanwei Wang
********************************************************************/
-#ifndef _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_
-#define _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_
+#ifndef _YARN_LIBYARN_COMMON_EXCEPTION_H_
+#define _YARN_LIBYARN_COMMON_EXCEPTION_H_
#include <stdexcept>
#include <string>
@@ -42,6 +42,17 @@ public:
static const char * ReflexName;
};
+class YarnResourceManagerClosed: public YarnException {
+public:
+ YarnResourceManagerClosed(const std::string & arg, const char * file, int line,
+ const char * stack) :
+ YarnException(arg, file, line, stack) {
+ }
+
+ ~YarnResourceManagerClosed() throw () {
+ }
+};
+
class YarnNetworkException: public YarnIOException {
public:
YarnNetworkException(const std::string & arg, const char * file, int line,
@@ -456,14 +467,14 @@ public:
static const char * ReflexName;
};
-class NameNodeStandbyException: public YarnException {
+class ResourceManagerStandbyException: public YarnException {
public:
- NameNodeStandbyException(const std::string & arg, const char * file,
+ ResourceManagerStandbyException(const std::string & arg, const char * file,
int line, const char * stack) :
YarnException(arg, file, line, stack) {
}
- ~NameNodeStandbyException() throw () {
+ ~ResourceManagerStandbyException() throw () {
}
public:
@@ -473,4 +484,4 @@ public:
}
-#endif /* _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_ */
+#endif /* _YARN_LIBYARN_COMMON_EXCEPTION_H_ */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/XmlConfig.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/XmlConfig.h b/depends/libyarn/src/common/XmlConfig.h
index 3d04c56..2a3cb2c 100644
--- a/depends/libyarn/src/common/XmlConfig.h
+++ b/depends/libyarn/src/common/XmlConfig.h
@@ -42,7 +42,7 @@ public:
* Get a string with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
const char * getString(const char * key) const;
@@ -59,7 +59,7 @@ public:
* Get a string with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
const char * getString(const std::string & key) const;
@@ -77,7 +77,7 @@ public:
* Get a 64 bit integer with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
int64_t getInt64(const char * key) const;
@@ -94,7 +94,7 @@ public:
* Get a 32 bit integer with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
int32_t getInt32(const char * key) const;
@@ -111,7 +111,7 @@ public:
* Get a double with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
double getDouble(const char * key) const;
@@ -128,7 +128,7 @@ public:
* Get a boolean with given configure key.
* @param key The key of the configure item.
* @return The value of configure item.
- * @throw HdfsConfigNotFound
+ * @throw YarnConfigNotFound
*/
bool getBool(const char * key) const;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
index cc72c5b..fd4b11f 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
@@ -4,29 +4,145 @@
#include "rpc/RpcAuth.h"
#include "common/XmlConfig.h"
#include "common/SessionConfig.h"
-
+#include "Exception.h"
+#include "ExceptionInternal.h"
#include "ApplicationClient.h"
+#include "StringUtil.h"
namespace libyarn {
-ApplicationClient::ApplicationClient(string &user, string &host, string &port) {
- std::string tokenService = "";
- Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
- Yarn::Internal::SessionConfig sessionConfig(*conf);
- LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str());
- appClient = (void*) new ApplicationClientProtocol(user, host, port, tokenService, sessionConfig);
+RMInfo::RMInfo() {
}
-ApplicationClient::ApplicationClient(ApplicationClientProtocol *appclient){
- appClient = (void*)appclient;
+const char * YARN_RESOURCEMANAGER_HA = "yarn.resourcemanager.ha";
+
+std::vector<RMInfo> RMInfo::getHARMInfo(const Yarn::Config & conf, const char* name) {
+ std::vector<RMInfo> retval;
+ /*
+ * Read config and create a vector of RM address.
+ */
+ try{
+ std::string strHA = StringTrim(conf.getString(std::string(name)));
+ std::vector<std::string> strRMs = StringSplit(strHA, ",");
+ retval.resize(strRMs.size());
+ for (size_t i = 0; i < strRMs.size(); ++i) {
+ std::vector<std::string> rm = StringSplit(strRMs[i], ":");
+ retval[i].setHost(rm[0]);
+ retval[i].setPort(rm[1]);
+ }
+ } catch (const Yarn::YarnConfigNotFound &e) {
+ LOG(INFO, "Yarn RM HA is not configured.");
+ }
+
+return retval;
+}
+
+ApplicationClient::ApplicationClient(string &user, string &host, string &port) {
+ std::string tokenService = "";
+ Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
+ Yarn::Internal::SessionConfig sessionConfig(*conf);
+ LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str());
+
+ std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_HA);
+
+ if (rmInfos.size() <= 1) {
+ LOG(INFO, "ApplicationClient Resource Manager HA is disable.");
+ enableRMHA = false;
+ maxRMHARetry = 0;
+ } else {
+ LOG(INFO, "ApplicationClient Resource Manager HA is enable. Number of RM: %d", rmInfos.size());
+ enableRMHA = true;
+ maxRMHARetry = sessionConfig.getRpcMaxHaRetry();
+ }
+
+ if (!enableRMHA)
+ {
+ appClientProtos.push_back(
+ std::shared_ptr<ApplicationClientProtocol>(
+ new ApplicationClientProtocol(user, host, port, tokenService, sessionConfig)));
+ } else {
+ /*
+ * iterate RMInfo vector and create 1-1 applicationClientProtocol for each standby RM
+ */
+ for (size_t i = 0; i < rmInfos.size(); ++i) {
+ appClientProtos.push_back(
+ std::shared_ptr<ApplicationClientProtocol>(
+ new ApplicationClientProtocol(
+ user, rmInfos[i].getHost(),rmInfos[i].getPort(), tokenService, sessionConfig)));
+ LOG(INFO, "ApplicationClient finds a standby RM, host:%s, port:%s",
+ rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str());
+ }
+ }
+ currentAppClientProto = 0;
}
ApplicationClient::~ApplicationClient() {
- if (appClient != NULL){
- delete (ApplicationClientProtocol*) appClient;
- }
}
+std::shared_ptr<ApplicationClientProtocol>
+ ApplicationClient::getActiveAppClientProto(uint32_t & oldValue) {
+ lock_guard<mutex> lock(this->mut);
+
+ LOG(INFO, "ApplicationClient::getActiveAppClientProto is called.");
+
+ if (appClientProtos.empty()) {
+ LOG(WARNING, "The vector of ApplicationClientProtocol is empty.");
+ THROW(Yarn::YarnResourceManagerClosed, "ApplicationClientProtocol is closed.");
+ }
+
+ oldValue = currentAppClientProto;
+ LOG(INFO, "ApplicationClient::getActiveAppClientProto, current is %d.", currentAppClientProto);
+ return appClientProtos[currentAppClientProto % appClientProtos.size()];
+}
+
+void ApplicationClient::failoverToNextAppClientProto(uint32_t oldValue){
+ lock_guard<mutex> lock(mut);
+
+ if (oldValue != currentAppClientProto || appClientProtos.size() == 1) {
+ return;
+ }
+
+ ++currentAppClientProto;
+ currentAppClientProto = currentAppClientProto % appClientProtos.size();
+ LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is %d.", currentAppClientProto);
+}
+
+static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) {
+ try {
+ rethrow_if_nested(e);
+ } catch (...) {
+ NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
+ }
+
+ //should not reach here
+ abort();
+}
+
+
+#define RESOURCEMANAGER_HA_RETRY_BEGIN() \
+ do { \
+ int __count = 0; \
+ do { \
+ uint32_t __oldValue = 0; \
+ std::shared_ptr<ApplicationClientProtocol> appClientProto = getActiveAppClientProto(__oldValue); \
+ try { \
+ (void)0
+
+#define RESOURCEMANAGER_HA_RETRY_END() \
+ break; \
+ } catch (const Yarn::ResourceManagerStandbyException & e) { \
+ if (!enableRMHA || __count++ > maxRMHARetry) { \
+ throw; \
+ } \
+ } catch (const Yarn::YarnFailoverException & e) { \
+ if (!enableRMHA || __count++ > maxRMHARetry) { \
+ HandleYarnFailoverException(e); \
+ } \
+ } \
+ failoverToNextAppClientProto(__oldValue); \
+ } while (true); \
+ } while (0)
+
/*
rpc getNewApplication (GetNewApplicationRequestProto) returns (GetNewApplicationResponseProto);
@@ -39,11 +155,13 @@ ApplicationClient::~ApplicationClient() {
}
*/
ApplicationID ApplicationClient::getNewApplication() {
- ApplicationClientProtocol* appClientAlias =
- ((ApplicationClientProtocol*) appClient);
- GetNewApplicationRequest request;
- GetNewApplicationResponse response = appClientAlias->getNewApplication(request);
- return response.getApplicationId();
+ GetNewApplicationRequest request;
+ GetNewApplicationResponse response;
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getNewApplication(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+ return response.getApplicationId();
}
/*
@@ -58,12 +176,13 @@ ApplicationID ApplicationClient::getNewApplication() {
*/
void ApplicationClient::submitApplication(
- ApplicationSubmissionContext &appContext) {
- ApplicationClientProtocol* appClientAlias =
- ((ApplicationClientProtocol*) appClient);
- SubmitApplicationRequest request;
- request.setApplicationSubmissionContext(appContext);
- appClientAlias->submitApplication(request);
+ ApplicationSubmissionContext &appContext) {
+ SubmitApplicationRequest request;
+ request.setApplicationSubmissionContext(appContext);
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ appClientProto->submitApplication(request);
+ RESOURCEMANAGER_HA_RETRY_END();
}
/*
@@ -79,85 +198,102 @@ void ApplicationClient::submitApplication(
*/
ApplicationReport ApplicationClient::getApplicationReport(
- ApplicationID &appId) {
- ApplicationClientProtocol* appClientAlias =
- ((ApplicationClientProtocol*) appClient);
- GetApplicationReportRequest request;
- request.setApplicationId(appId);
- GetApplicationReportResponse response =
- appClientAlias->getApplicationReport(request);
- /*ApplicationReport report = response.getApplicationReport();
- if (report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) {
- Token token = report.getAMRMToken();
- LOG(INFO,"%s",report.getClientToAMToken().getIdentifier());
- }*/
- return response.getApplicationReport();
+ ApplicationID &appId) {
+ GetApplicationReportRequest request;
+ GetApplicationReportResponse response;
+
+ request.setApplicationId(appId);
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getApplicationReport(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getApplicationReport();
}
list<ContainerReport> ApplicationClient::getContainers(ApplicationAttemptId &appAttempId){
- ApplicationClientProtocol* appClientAlias =
- ((ApplicationClientProtocol*) appClient);
- GetContainersRequest request;
- request.setApplicationAttemptId(appAttempId);
- /*LOG(INFO,
- "ApplicationClient::getContainers, appId[cluster_timestamp:%lld,id:%d]",
- request.getApplicationId().getClusterTimestamp(), request.getApplicationId().getId());
- */
- GetContainersResponse response =
- appClientAlias->getContainers(request);
- return response.getcontainersReportList();
+ GetContainersRequest request;
+ GetContainersResponse response;
+
+ request.setApplicationAttemptId(appAttempId);
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getContainers(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getcontainersReportList();
}
list<NodeReport> ApplicationClient::getClusterNodes(list<NodeState> &states) {
- GetClusterNodesRequest request;
- request.setNodeStates(states);
- GetClusterNodesResponse response =
- ((ApplicationClientProtocol*) appClient)->getClusterNodes(request);
- return response.getNodeReports();
+ GetClusterNodesRequest request;
+ GetClusterNodesResponse response;
+ request.setNodeStates(states);
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getClusterNodes(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getNodeReports();
}
QueueInfo ApplicationClient::getQueueInfo(string &queue, bool includeApps,
- bool includeChildQueues, bool recursive) {
- GetQueueInfoRequest request;
- request.setQueueName(queue);
- request.setIncludeApplications(includeApps);
- request.setIncludeChildQueues(includeChildQueues);
- request.setRecursive(recursive);
- GetQueueInfoResponse response =
- ((ApplicationClientProtocol*) appClient)->getQueueInfo(request);
- return response.getQueueInfo();
+ bool includeChildQueues, bool recursive) {
+ GetQueueInfoRequest request;
+ GetQueueInfoResponse response;
+ request.setQueueName(queue);
+ request.setIncludeApplications(includeApps);
+ request.setIncludeChildQueues(includeChildQueues);
+ request.setRecursive(recursive);
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getQueueInfo(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getQueueInfo();
}
void ApplicationClient::forceKillApplication(ApplicationID &appId) {
- KillApplicationRequest request;
- request.setApplicationId(appId);
- ((ApplicationClientProtocol*) appClient)->forceKillApplication(request);
+ KillApplicationRequest request;
+ request.setApplicationId(appId);
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ appClientProto->forceKillApplication(request);
+ RESOURCEMANAGER_HA_RETRY_END();
}
YarnClusterMetrics ApplicationClient::getClusterMetrics() {
- GetClusterMetricsRequest request;
- GetClusterMetricsResponse response =
- ((ApplicationClientProtocol*) appClient)->getClusterMetrics(
- request);
- return response.getClusterMetrics();
+ GetClusterMetricsRequest request;
+ GetClusterMetricsResponse response;
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getClusterMetrics(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getClusterMetrics();
}
list<ApplicationReport> ApplicationClient::getApplications(
- list<string> &applicationTypes,
- list<YarnApplicationState> &applicationStates) {
- GetApplicationsRequest request;
- request.setApplicationStates(applicationStates);
- request.setApplicationTypes(applicationTypes);
- GetApplicationsResponse response =
- ((ApplicationClientProtocol*) appClient)->getApplications(request);
- return response.getApplicationList();
+ list<string> &applicationTypes,
+ list<YarnApplicationState> &applicationStates) {
+ GetApplicationsRequest request;
+ GetApplicationsResponse response;
+ request.setApplicationStates(applicationStates);
+ request.setApplicationTypes(applicationTypes);
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getApplications(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getApplicationList();
}
list<QueueUserACLInfo> ApplicationClient::getQueueAclsInfo() {
- GetQueueUserAclsInfoRequest request;
- GetQueueUserAclsInfoResponse response =
- ((ApplicationClientProtocol*) appClient)->getQueueAclsInfo(request);
- return response.getUserAclsInfoList();
+ GetQueueUserAclsInfoRequest request;
+ GetQueueUserAclsInfoResponse response;
+
+ RESOURCEMANAGER_HA_RETRY_BEGIN();
+ response = appClientProto->getQueueAclsInfo(request);
+ RESOURCEMANAGER_HA_RETRY_END();
+
+ return response.getUserAclsInfoList();
}
} /* namespace libyarn */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.h b/depends/libyarn/src/libyarnclient/ApplicationClient.h
index ae81ea0..49aacb7 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationClient.h
+++ b/depends/libyarn/src/libyarnclient/ApplicationClient.h
@@ -10,6 +10,7 @@
#include "records/ApplicationSubmissionContext.h"
#include "records/YarnClusterMetrics.h"
#include "records/QueueUserACLInfo.h"
+#include "Thread.h"
using namespace std;
@@ -64,44 +65,85 @@ private:
Yarn::Internal::shared_ptr<Yarn::Config> conf;
};
+class RMInfo {
+
+public:
+ RMInfo();
+
+ RMInfo(string &rmHost, string &rmPort) : host(rmHost), port(rmPort){};
+
+ const std::string & getHost() const {
+ return host;
+ }
+
+ void setHost(const std::string & rmHost) {
+ host = rmHost;
+ }
+
+ const std::string & getPort() const {
+ return port;
+ }
+
+ void setPort(const std::string & rmPort) {
+ port = rmPort;
+ }
+
+ static std::vector<RMInfo> getHARMInfo(const Yarn::Config & conf, const char* name);
+
+private:
+ std::string host;
+ std::string port;
+};
+
class ApplicationClient {
public:
- ApplicationClient(string &user, string &host, string &port);
+ ApplicationClient(string &user, string &host, string &port);
- ApplicationClient(ApplicationClientProtocol *appclient);
+ virtual ~ApplicationClient();
- virtual ~ApplicationClient();
+ virtual ApplicationID getNewApplication();
- virtual ApplicationID getNewApplication();
+ virtual void submitApplication(ApplicationSubmissionContext &appContext);
- virtual void submitApplication(ApplicationSubmissionContext &appContext);
+ virtual ApplicationReport getApplicationReport(ApplicationID &appId);
- virtual ApplicationReport getApplicationReport(ApplicationID &appId);
+ virtual list<ContainerReport> getContainers(ApplicationAttemptId &appAttempId);
- virtual list<ContainerReport> getContainers(ApplicationAttemptId &appAttempId);
+ virtual list<NodeReport> getClusterNodes(list<NodeState> &state);
- virtual list<NodeReport> getClusterNodes(list<NodeState> &state);
+ virtual QueueInfo getQueueInfo(string &queue, bool includeApps,
+ bool includeChildQueues, bool recursive);
- virtual QueueInfo getQueueInfo(string &queue, bool includeApps,
- bool includeChildQueues, bool recursive);
+ virtual void forceKillApplication(ApplicationID &appId);
- virtual void forceKillApplication(ApplicationID &appId);
+ virtual YarnClusterMetrics getClusterMetrics();
- virtual YarnClusterMetrics getClusterMetrics();
+ virtual list<ApplicationReport> getApplications(list<string> &applicationTypes,
+ list<YarnApplicationState> &applicationStates);
- virtual list<ApplicationReport> getApplications(list<string> &applicationTypes,
- list<YarnApplicationState> &applicationStates);
+ virtual list<QueueUserACLInfo> getQueueAclsInfo();
- virtual list<QueueUserACLInfo> getQueueAclsInfo();
+ const std::string & getUser(){uint32_t old=0; return getActiveAppClientProto(old)->getUser();};
- const std::string & getUser() const {return ((ApplicationClientProtocol*)appClient)->getUser();};
+ const AuthMethod getMethod(){uint32_t old=0; return getActiveAppClientProto(old)->getMethod();};
- AuthMethod getMethod() const {return ((ApplicationClientProtocol*)appClient)->getMethod();};
+ const std::string getPrincipal(){uint32_t old=0; return getActiveAppClientProto(old)->getPrincipal();};
- const std::string getPrincipal() const {return ((ApplicationClientProtocol*)appClient)->getPrincipal();};
+private:
+ std::shared_ptr<ApplicationClientProtocol> getActiveAppClientProto(uint32_t & oldValue);
+ void failoverToNextAppClientProto(uint32_t oldValue);
private:
- void *appClient;
+ bool enableRMHA;
+ int maxRMHARetry;
+ mutex mut;
+ /**
+ * Each ApplicationClientProto object stands for a connection to a standby resource manager.
+ * If application client fail in connecting the active resource manager, it will try the
+ * next one in the list.
+ */
+ std::vector<std::shared_ptr<ApplicationClientProtocol>> appClientProtos;
+ uint32_t currentAppClientProto;
};
} /* namespace libyarn */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
index 0d26082..d7c4dc6 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
@@ -8,24 +8,113 @@
#include "ApplicationClient.h"
namespace libyarn {
+
+const char * YARN_RESOURCEMANAGER_SCHEDULER_HA = "yarn.resourcemanager.scheduler.ha";
+
ApplicationMaster::ApplicationMaster(string &schedHost, string &schedPort,
- UserInfo &user, const string &tokenService) {
- Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
- Yarn::Internal::SessionConfig sessionConfig(*conf);
- RpcAuth rpcAuth(user, AuthMethod::TOKEN);
- rmClient = (void*) new ApplicationMasterProtocol(schedHost,
- schedPort, tokenService, sessionConfig, rpcAuth);
+ UserInfo &user, const string &tokenService) {
+ Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
+ Yarn::Internal::SessionConfig sessionConfig(*conf);
+ RpcAuth rpcAuth(user, AuthMethod::TOKEN);
+
+ std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_SCHEDULER_HA);
+
+ if (rmInfos.size() <= 1) {
+ LOG(INFO, "ApplicationClient RM Scheduler HA is disable.");
+ enableRMSchedulerHA = false;
+ maxRMHARetry = 0;
+ } else {
+ LOG(INFO, "ApplicationClient RM Scheduler HA is enable. Number of RM scheduler: %d", rmInfos.size());
+ enableRMSchedulerHA = true;
+ maxRMHARetry = sessionConfig.getRpcMaxHaRetry();
+ }
+
+ if (!enableRMSchedulerHA)
+ {
+ appMasterProtos.push_back(
+ std::shared_ptr<ApplicationMasterProtocol>(
+ new ApplicationMasterProtocol(schedHost, schedPort, tokenService, sessionConfig, rpcAuth)));
+ }
+ else {
+ /*
+ * iterate RMInfo vector and create 1-1 applicationMasterProtocol for each standby RM scheduler.
+ */
+ for (size_t i = 0; i < rmInfos.size(); ++i) {
+ appMasterProtos.push_back(
+ std::shared_ptr<ApplicationMasterProtocol>(
+ new ApplicationMasterProtocol(rmInfos[i].getHost(),
+ rmInfos[i].getPort(), tokenService, sessionConfig, rpcAuth)));
+ LOG(INFO, "ApplicationMaster finds a standby RM scheduler, host:%s, port:%s",
+ rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str());
+ }
+ }
+ currentAppMasterProto = 0;
}
-ApplicationMaster::ApplicationMaster(ApplicationMasterProtocol *rmclient){
- rmClient = (void*)rmclient;
+ApplicationMaster::~ApplicationMaster() {
}
+std::shared_ptr<ApplicationMasterProtocol>
+ ApplicationMaster::getActiveAppMasterProto(uint32_t & oldValue) {
+ lock_guard<mutex> lock(this->mut);
-ApplicationMaster::~ApplicationMaster() {
- delete (ApplicationMasterProtocol*) rmClient;
+ if (appMasterProtos.empty()) {
+ LOG(WARNING, "The vector of ApplicationMasterProtocol is empty.");
+ THROW(Yarn::YarnResourceManagerClosed, "ApplicationMasterProtocol is closed.");
+ }
+
+ oldValue = currentAppMasterProto;
+ LOG(INFO, "ApplicationMaster::getActiveAppMasterProto, current is %d.", currentAppMasterProto);
+ return appMasterProtos[currentAppMasterProto % appMasterProtos.size()];
+}
+
+void ApplicationMaster::failoverToNextAppMasterProto(uint32_t oldValue){
+ lock_guard<mutex> lock(mut);
+
+ if (oldValue != currentAppMasterProto || appMasterProtos.size() == 1) {
+ return;
+ }
+
+ ++currentAppMasterProto;
+ currentAppMasterProto = currentAppMasterProto % appMasterProtos.size();
+ LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is %d.", currentAppMasterProto);
+}
+
+static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) {
+ try {
+ rethrow_if_nested(e);
+ } catch (...) {
+ NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
+ }
+
+ //should not reach here
+ abort();
}
+#define RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN() \
+ do { \
+ int __count = 0; \
+ do { \
+ uint32_t __oldValue = 0; \
+ std::shared_ptr<ApplicationMasterProtocol> appMasterProto = getActiveAppMasterProto(__oldValue); \
+ try { \
+ (void)0
+
+#define RESOURCEMANAGER_SCHEDULER_HA_RETRY_END() \
+ break; \
+ } catch (const Yarn::ResourceManagerStandbyException & e) { \
+ if (!enableRMSchedulerHA || __count++ > maxRMHARetry) { \
+ throw; \
+ } \
+ } catch (const Yarn::YarnFailoverException & e) { \
+ if (!enableRMSchedulerHA || __count++ > maxRMHARetry) { \
+ HandleYarnFailoverException(e); \
+ } \
+ } \
+ failoverToNextAppMasterProto(__oldValue); \
+ } while (true); \
+ } while (0)
+
/*
rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
@@ -42,13 +131,17 @@ message RegisterApplicationMasterResponseProto {
}
*/
RegisterApplicationMasterResponse ApplicationMaster::registerApplicationMaster(
- string &amHost, int32_t amPort, string &am_tracking_url) {
- RegisterApplicationMasterRequest request;
- request.setHost(amHost);
- request.setRpcPort(amPort);
- request.setTrackingUrl(am_tracking_url);
- ApplicationMasterProtocol* rmClientAlias = (ApplicationMasterProtocol*) rmClient;
- return rmClientAlias->registerApplicationMaster(request);
+ string &amHost, int32_t amPort, string &am_tracking_url) {
+ RegisterApplicationMasterRequest request;
+ RegisterApplicationMasterResponse response;
+ request.setHost(amHost);
+ request.setRpcPort(amPort);
+ request.setTrackingUrl(am_tracking_url);
+
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+ response = appMasterProto->registerApplicationMaster(request);
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+ return response;
}
/*
@@ -74,16 +167,20 @@ message AllocateResponseProto {
*/
AllocateResponse ApplicationMaster::allocate(list<ResourceRequest> &asks,
- list<ContainerId> &releases, ResourceBlacklistRequest &blacklistRequest,
- int32_t responseId, float progress) {
- AllocateRequest request;
- request.setAsks(asks);
- request.setReleases(releases);
- request.setBlacklistRequest(blacklistRequest);
- request.setResponseId(responseId);
- request.setProgress(progress);
-
- return ((ApplicationMasterProtocol*) rmClient)->allocate(request);
+ list<ContainerId> &releases, ResourceBlacklistRequest &blacklistRequest,
+ int32_t responseId, float progress) {
+ AllocateRequest request;
+ AllocateResponse response;
+ request.setAsks(asks);
+ request.setReleases(releases);
+ request.setBlacklistRequest(blacklistRequest);
+ request.setResponseId(responseId);
+ request.setProgress(progress);
+
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+ response = appMasterProto->allocate(request);
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+ return response;
}
/*
@@ -101,17 +198,17 @@ message FinishApplicationMasterResponseProto {
*/
bool ApplicationMaster::finishApplicationMaster(string &diagnostics,
- string &trackingUrl, FinalApplicationStatus finalstatus) {
- ApplicationMasterProtocol* rmClientAlias = (ApplicationMasterProtocol*) rmClient;
-
- FinishApplicationMasterRequest request;
- request.setDiagnostics(diagnostics);
- request.setTrackingUrl(trackingUrl);
- request.setFinalApplicationStatus(finalstatus);
-
- FinishApplicationMasterResponse response = rmClientAlias->finishApplicationMaster(request);
-
- return response.getIsUnregistered();
+ string &trackingUrl, FinalApplicationStatus finalstatus) {
+ FinishApplicationMasterRequest request;
+ FinishApplicationMasterResponse response;
+ request.setDiagnostics(diagnostics);
+ request.setTrackingUrl(trackingUrl);
+ request.setFinalApplicationStatus(finalstatus);
+
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+ response = appMasterProto->finishApplicationMaster(request);
+ RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+ return response.getIsUnregistered();
}
} /* namespace libyarn */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationMaster.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.h b/depends/libyarn/src/libyarnclient/ApplicationMaster.h
index 7df1e46..999c146 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationMaster.h
+++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.h
@@ -32,26 +32,36 @@ namespace libyarn {
class ApplicationMaster {
public:
- ApplicationMaster(string &schedHost, string &schedPort,
- UserInfo &user, const string &tokenService);
+ ApplicationMaster(string &schedHost, string &schedPort,
+ UserInfo &user, const string &tokenService);
- ApplicationMaster(ApplicationMasterProtocol *rmclient);
+ virtual ~ApplicationMaster();
- virtual ~ApplicationMaster();
+ virtual RegisterApplicationMasterResponse registerApplicationMaster(string &amHost,
+ int32_t amPort, string &am_tracking_url);
- virtual RegisterApplicationMasterResponse registerApplicationMaster(string &amHost,
- int32_t amPort, string &am_tracking_url);
+ virtual AllocateResponse allocate(list<ResourceRequest> &asks,
+ list<ContainerId> &releases,
+ ResourceBlacklistRequest &blacklistRequest, int32_t responseId,
+ float progress);
- virtual AllocateResponse allocate(list<ResourceRequest> &asks,
- list<ContainerId> &releases,
- ResourceBlacklistRequest &blacklistRequest, int32_t responseId,
- float progress);
-
- virtual bool finishApplicationMaster(string &diagnostics, string &trackingUrl,
- FinalApplicationStatus finalstatus);
+ virtual bool finishApplicationMaster(string &diagnostics, string &trackingUrl,
+ FinalApplicationStatus finalstatus);
+private:
+ std::shared_ptr<ApplicationMasterProtocol> getActiveAppMasterProto(uint32_t & oldValue);
+ void failoverToNextAppMasterProto(uint32_t oldValue);
private:
- void *rmClient;
+ bool enableRMSchedulerHA;
+ int maxRMHARetry;
+ mutex mut;
+ /**
+ * Each ApplicationMasterProto object stands for a connection to a standby RM scheduler.
+ * If application master fail in connecting the active RM scheduler, it will try the
+ * next one in the list.
+ */
+ std::vector<std::shared_ptr<ApplicationMasterProtocol>> appMasterProtos;
+ uint32_t currentAppMasterProto;
};
} /* namespace libyarn */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
index 71bc3e9..620d48f 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
@@ -16,43 +16,43 @@ using namespace Yarn;
namespace libyarn {
ApplicationClientProtocol::ApplicationClientProtocol(const string &rmUser,
- const string & rmHost, const string & rmPort,
- const string & tokenService,const SessionConfig & c) :
- client(RpcClient::getClient()), conf(c),
- protocol(APP_CLIENT_PROTOCOL_VERSION, APP_CLIENT_PROTOCOL,APP_CLIENT_DELEGATION_TOKEN_KIND),
- server(tokenService, rmHost, rmPort) {
-
- /* create RpcAuth for rpc method,
- * can be SIMPLE or KERBEROS
- * */
- if (RpcAuth::ParseMethod(c.getRpcAuthMethod()) == KERBEROS) {
- /*
- * If using KERBEROS, rmUser should be principal name.
- */
- Yarn::Internal::UserInfo user(rmUser);
- user.setRealUser(user.getEffectiveUser());
- Yarn::Internal::RpcAuth rpcAuth(user, KERBEROS);
- auth = rpcAuth;
- } else {
- Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser();
- Yarn::Internal::RpcAuth rpcAuth(user, SIMPLE);
- auth = rpcAuth;
- }
+ const string & rmHost, const string & rmPort,
+ const string & tokenService,const SessionConfig & c) :
+ client(RpcClient::getClient()), conf(c),
+ protocol(APP_CLIENT_PROTOCOL_VERSION, APP_CLIENT_PROTOCOL,APP_CLIENT_DELEGATION_TOKEN_KIND),
+ server(tokenService, rmHost, rmPort) {
+
+ /* create RpcAuth for rpc method,
+ * can be SIMPLE or KERBEROS
+ */
+ if (RpcAuth::ParseMethod(c.getRpcAuthMethod()) == KERBEROS) {
+ /*
+ * If using KERBEROS, rmUser should be principal name.
+ */
+ Yarn::Internal::UserInfo user(rmUser);
+ user.setRealUser(user.getEffectiveUser());
+ Yarn::Internal::RpcAuth rpcAuth(user, KERBEROS);
+ auth = rpcAuth;
+ } else {
+ Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser();
+ Yarn::Internal::RpcAuth rpcAuth(user, SIMPLE);
+ auth = rpcAuth;
+ }
}
ApplicationClientProtocol::~ApplicationClientProtocol() {
}
void ApplicationClientProtocol::invoke(const RpcCall & call) {
- try {
- channel = &client.getChannel(auth, protocol, server, conf);
- channel->invoke(call);
- channel->close(false);
- }
- catch (...) {
- channel->close(false);
- throw;
- }
+ try {
+ channel = &client.getChannel(auth, protocol, server, conf);
+ channel->invoke(call);
+ channel->close(false);
+ }
+ catch (...) {
+ channel->close(false);
+ throw;
+ }
}
/*
@@ -68,21 +68,23 @@ void ApplicationClientProtocol::invoke(const RpcCall & call) {
*/
GetNewApplicationResponse ApplicationClientProtocol::getNewApplication(
- GetNewApplicationRequest &request) {
- try {
- GetNewApplicationResponseProto responseProto;
- GetNewApplicationRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getNewApplication", &requestProto, &responseProto));
- return GetNewApplicationResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getNewApplication in %s: %d",
- __FILE__, __LINE__);
- }
+ GetNewApplicationRequest &request) {
+ try {
+ GetNewApplicationResponseProto responseProto;
+ GetNewApplicationRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getNewApplication", &requestProto, &responseProto));
+ return GetNewApplicationResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getNewApplication in %s: %d",
+ __FILE__, __LINE__);
+ }
}
/*
@@ -97,20 +99,22 @@ GetNewApplicationResponse ApplicationClientProtocol::getNewApplication(
*/
void ApplicationClientProtocol::submitApplication(
- SubmitApplicationRequest &request) {
- try {
- SubmitApplicationResponseProto responseProto;
- SubmitApplicationRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "submitApplication", &requestProto, &responseProto));
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::submitApplication in %s: %d",
- __FILE__, __LINE__);
- }
+ SubmitApplicationRequest &request) {
+ try {
+ SubmitApplicationResponseProto responseProto;
+ SubmitApplicationRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "submitApplication", &requestProto, &responseProto));
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::submitApplication in %s: %d",
+ __FILE__, __LINE__);
+ }
}
/*
rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
@@ -124,22 +128,24 @@ void ApplicationClientProtocol::submitApplication(
}
*/
GetApplicationReportResponse ApplicationClientProtocol::getApplicationReport(
- GetApplicationReportRequest &request) {
- try {
- GetApplicationReportResponseProto responseProto;
- GetApplicationReportRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getApplicationReport", &requestProto, &responseProto));
- return GetApplicationReportResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getApplicationReport in %s: %d",
- __FILE__, __LINE__);
- }
-}
+ GetApplicationReportRequest &request) {
+ try {
+ GetApplicationReportResponseProto responseProto;
+ GetApplicationReportRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getApplicationReport", &requestProto, &responseProto));
+ return GetApplicationReportResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getApplicationReport in %s: %d",
+ __FILE__, __LINE__);
+ }
+ }
/*
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
@@ -153,20 +159,22 @@ message GetContainersResponseProto {
}
*/
GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequest &request){
- try {
- GetContainersResponseProto responseProto;
- GetContainersRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getContainers", &requestProto,&responseProto));
- return GetContainersResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getContainers in %s: %d",
- __FILE__, __LINE__);
- }
+ try {
+ GetContainersResponseProto responseProto;
+ GetContainersRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getContainers", &requestProto,&responseProto));
+ return GetContainersResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getContainers in %s: %d",
+ __FILE__, __LINE__);
+ }
}
/*
@@ -181,21 +189,23 @@ GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequ
}
*/
GetClusterNodesResponse ApplicationClientProtocol::getClusterNodes(
- GetClusterNodesRequest &request) {
- try {
- GetClusterNodesResponseProto responseProto;
- GetClusterNodesRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getClusterNodes", &requestProto, &responseProto));
- return GetClusterNodesResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getClusterNodes in %s: %d",
- __FILE__, __LINE__);
- }
+ GetClusterNodesRequest &request) {
+ try {
+ GetClusterNodesResponseProto responseProto;
+ GetClusterNodesRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getClusterNodes", &requestProto, &responseProto));
+ return GetClusterNodesResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getClusterNodes in %s: %d",
+ __FILE__, __LINE__);
+ }
}
/*
@@ -223,21 +233,23 @@ GetClusterNodesResponse ApplicationClientProtocol::getClusterNodes(
}
*/
GetQueueInfoResponse ApplicationClientProtocol::getQueueInfo(
- GetQueueInfoRequest &request) {
- try {
- GetQueueInfoResponseProto responseProto;
- GetQueueInfoRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getQueueInfo", &requestProto, &responseProto));
- return GetQueueInfoResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getQueueInfo in %s: %d",
- __FILE__, __LINE__);
- }
+ GetQueueInfoRequest &request) {
+ try {
+ GetQueueInfoResponseProto responseProto;
+ GetQueueInfoRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getQueueInfo", &requestProto, &responseProto));
+ return GetQueueInfoResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getQueueInfo in %s: %d",
+ __FILE__, __LINE__);
+ }
}
//rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
@@ -246,81 +258,85 @@ GetQueueInfoResponse ApplicationClientProtocol::getQueueInfo(
//}
GetClusterMetricsResponse ApplicationClientProtocol::getClusterMetrics(
- GetClusterMetricsRequest &request) {
- try {
- GetClusterMetricsResponseProto responseProto;
- GetClusterMetricsRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getClusterMetrics", &requestProto, &responseProto));
- return GetClusterMetricsResponse(responseProto);
- }
- catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getClusterMetrics in %s: %d",
- __FILE__, __LINE__);
- }
+ GetClusterMetricsRequest &request) {
+ try {
+ GetClusterMetricsResponseProto responseProto;
+ GetClusterMetricsRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getClusterMetrics", &requestProto, &responseProto));
+ return GetClusterMetricsResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getClusterMetrics in %s: %d",
+ __FILE__, __LINE__);
+ }
}
KillApplicationResponse ApplicationClientProtocol::forceKillApplication(
- KillApplicationRequest &request) {
- try {
- KillApplicationResponseProto responseProto;
- KillApplicationRequestProto requestProto = request.getProto();
- invoke(
- RpcCall(true, "forceKillApplication", &requestProto,
- &responseProto));
- return KillApplicationResponse(responseProto);
- }
- catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::forceKillApplication in %s: %d",
- __FILE__, __LINE__);
- }
+ KillApplicationRequest &request) {
+ try {
+ KillApplicationResponseProto responseProto;
+ KillApplicationRequestProto requestProto = request.getProto();
+ invoke(
+ RpcCall(true, "forceKillApplication", &requestProto,
+ &responseProto));
+ return KillApplicationResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::forceKillApplication in %s: %d",
+ __FILE__, __LINE__);
+ }
}
GetApplicationsResponse ApplicationClientProtocol::getApplications(
- GetApplicationsRequest &request) {
- try {
- GetApplicationsResponseProto responseProto;
- GetApplicationsRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getApplications", &requestProto, &responseProto));
- return GetApplicationsResponse(responseProto);
- }
- catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getApplications in %s: %d",
- __FILE__, __LINE__);
- }
+ GetApplicationsRequest &request) {
+ try {
+ GetApplicationsResponseProto responseProto;
+ GetApplicationsRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getApplications", &requestProto, &responseProto));
+ return GetApplicationsResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getApplications in %s: %d",
+ __FILE__, __LINE__);
+ }
}
GetQueueUserAclsInfoResponse ApplicationClientProtocol::getQueueAclsInfo(
- GetQueueUserAclsInfoRequest &request) {
- try {
- GetQueueUserAclsInfoResponseProto responseProto;
- GetQueueUserAclsInfoRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "getQueueUserAcls", &requestProto, &responseProto));
- return GetQueueUserAclsInfoResponse(responseProto);
- }
- catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationClientProtocol::getQueueAclsInfo in %s: %d",
- __FILE__, __LINE__);
- }
+ GetQueueUserAclsInfoRequest &request) {
+ try {
+ GetQueueUserAclsInfoResponseProto responseProto;
+ GetQueueUserAclsInfoRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "getQueueUserAcls", &requestProto, &responseProto));
+ return GetQueueUserAclsInfoResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationClientProtocol::getQueueAclsInfo in %s: %d",
+ __FILE__, __LINE__);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
index 6b1bf89..56b358f 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
@@ -19,39 +19,41 @@ using namespace Yarn;
namespace libyarn {
-ApplicationMasterProtocol::ApplicationMasterProtocol(std::string & schedHost,
- std::string & schedPort, const std::string & tokenService,
- const SessionConfig & c, const RpcAuth & a) :
- auth(a), client(RpcClient::getClient()), conf(c), protocol(
- APPLICATION_MASTER_VERSION, APPLICATION_MASTER_PROTOCOL,
- AMRM_TOKEN_KIND), server(tokenService, schedHost, schedPort) {
+ApplicationMasterProtocol::ApplicationMasterProtocol(const std::string & schedHost,
+ const std::string & schedPort, const std::string & tokenService,
+ const SessionConfig & c, const RpcAuth & a) :
+ auth(a), client(RpcClient::getClient()), conf(c), protocol(
+ APPLICATION_MASTER_VERSION, APPLICATION_MASTER_PROTOCOL,
+ AMRM_TOKEN_KIND), server(tokenService, schedHost, schedPort) {
}
ApplicationMasterProtocol::~ApplicationMasterProtocol() {
}
void ApplicationMasterProtocol::invoke(const RpcCall & call) {
- try {
- channel = &client.getChannel(auth, protocol, server, conf);
- channel->invoke(call);
- channel->close(false);
- } catch (...) {
- channel->close(false);
- throw;
- }
+ try {
+ channel = &client.getChannel(auth, protocol, server, conf);
+ channel->invoke(call);
+ channel->close(false);
+ } catch (...) {
+ channel->close(false);
+ throw;
+ }
}
RegisterApplicationMasterResponse ApplicationMasterProtocol::registerApplicationMaster(
- RegisterApplicationMasterRequest &request) {
- try {
- RegisterApplicationMasterResponseProto responseProto;
- RegisterApplicationMasterRequestProto requestProto = request.getProto();
- invoke(RpcCall(true, "registerApplicationMaster", &requestProto, &responseProto));
- return RegisterApplicationMasterResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
+ RegisterApplicationMasterRequest &request) {
+ try {
+ RegisterApplicationMasterResponseProto responseProto;
+ RegisterApplicationMasterRequestProto requestProto = request.getProto();
+ invoke(RpcCall(true, "registerApplicationMaster", &requestProto, &responseProto));
+ return RegisterApplicationMasterResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
THROW(YarnIOException,
"Unexpected exception: when calling "
"ApplicationMasterProtocol::registerApplicationMaster in %s: %d",
@@ -61,39 +63,43 @@ RegisterApplicationMasterResponse ApplicationMasterProtocol::registerApplication
AllocateResponse ApplicationMasterProtocol::allocate(AllocateRequest &request) {
- try {
- AllocateRequestProto requestProto = request.getProto();
- AllocateResponseProto responseProto;
- invoke(RpcCall(true, "allocate", &requestProto, &responseProto));
- return AllocateResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- }
- catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationMasterProtocol::allocate in %s: %d",
- __FILE__, __LINE__);
- }
+ try {
+ AllocateRequestProto requestProto = request.getProto();
+ AllocateResponseProto responseProto;
+ invoke(RpcCall(true, "allocate", &requestProto, &responseProto));
+ return AllocateResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ }
+ catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationMasterProtocol::allocate in %s: %d",
+ __FILE__, __LINE__);
+ }
}
FinishApplicationMasterResponse ApplicationMasterProtocol::finishApplicationMaster(
- FinishApplicationMasterRequest &request) {
- try {
- FinishApplicationMasterRequestProto requestProto = request.getProto();
- FinishApplicationMasterResponseProto responseProto;
- invoke(RpcCall(true, "finishApplicationMaster", &requestProto, &responseProto));
- return FinishApplicationMasterResponse(responseProto);
- } catch (const YarnRpcServerException & e) {
- UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
- unwrapper.unwrap(__FILE__, __LINE__);
- } catch (...) {
- THROW(YarnIOException,
- "Unexpected exception: when calling "
- "ApplicationMasterProtocol::finishApplicationMaster in %s: %d",
- __FILE__, __LINE__);
- }
+ FinishApplicationMasterRequest &request) {
+ try {
+ FinishApplicationMasterRequestProto requestProto = request.getProto();
+ FinishApplicationMasterResponseProto responseProto;
+ invoke(RpcCall(true, "finishApplicationMaster", &requestProto, &responseProto));
+ return FinishApplicationMasterResponse(responseProto);
+ } catch (const YarnFailoverException & e) {
+ throw;
+ } catch (const YarnRpcServerException & e) {
+ UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (...) {
+ THROW(YarnIOException,
+ "Unexpected exception: when calling "
+ "ApplicationMasterProtocol::finishApplicationMaster in %s: %d",
+ __FILE__, __LINE__);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
index ca637fa..70c352e 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
+++ b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
@@ -46,7 +46,7 @@ namespace libyarn {
class ApplicationMasterProtocol {
public:
- ApplicationMasterProtocol(std::string & schedHost, std::string & schedPort,
+ ApplicationMasterProtocol(const std::string & schedHost, const std::string & schedPort,
const std::string & tokenService, const SessionConfig & c,
const RpcAuth & a);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/rpc/RpcChannel.cpp b/depends/libyarn/src/rpc/RpcChannel.cpp
index 00f45b1..a688d7b 100644
--- a/depends/libyarn/src/rpc/RpcChannel.cpp
+++ b/depends/libyarn/src/rpc/RpcChannel.cpp
@@ -711,12 +711,12 @@ static exception_ptr HandlerRpcResponseException(exception_ptr e) {
try {
rethrow_exception(e);
} catch (const YarnRpcServerException & e) {
- UnWrapper < NameNodeStandbyException, UnsupportedOperationException,
+ UnWrapper < ResourceManagerStandbyException, UnsupportedOperationException,
AccessControlException, SafeModeException, SaslException > unwrapper(e);
try {
unwrapper.unwrap(__FILE__, __LINE__);
- } catch (const NameNodeStandbyException & e) {
+ } catch (const ResourceManagerStandbyException & e) {
retval = current_exception();
} catch (const UnsupportedOperationException & e) {
retval = current_exception();