You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/10/20 04:45:13 UTC

incubator-hawq git commit: HAWQ-38. Support HA for libyarn

Repository: incubator-hawq
Updated Branches:
  refs/heads/master cf81985f7 -> 413b6647b


HAWQ-38. Support HA for libyarn


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/413b6647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/413b6647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/413b6647

Branch: refs/heads/master
Commit: 413b6647bb0caaf05ad98ebf975a56338316bfe2
Parents: cf81985
Author: Wen Lin <wl...@pivotal.io>
Authored: Tue Oct 20 10:49:59 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Tue Oct 20 10:49:59 2015 +0800

----------------------------------------------------------------------
 depends/libyarn/src/CMakeLists.txt              |   2 +-
 depends/libyarn/src/common/Exception.cpp        |   2 +-
 depends/libyarn/src/common/Exception.h          |  23 +-
 depends/libyarn/src/common/XmlConfig.h          |  12 +-
 .../src/libyarnclient/ApplicationClient.cpp     | 298 ++++++++++----
 .../src/libyarnclient/ApplicationClient.h       |  80 +++-
 .../src/libyarnclient/ApplicationMaster.cpp     | 173 +++++++--
 .../src/libyarnclient/ApplicationMaster.h       |  38 +-
 .../libyarnserver/ApplicationClientProtocol.cpp | 388 ++++++++++---------
 .../libyarnserver/ApplicationMasterProtocol.cpp | 114 +++---
 .../libyarnserver/ApplicationMasterProtocol.h   |   2 +-
 depends/libyarn/src/rpc/RpcChannel.cpp          |   4 +-
 12 files changed, 727 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index 1ecf6ba..d159216 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
 SET(libyarn_VERSION_MAJOR 0)
 SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 4)
+SET(libyarn_VERSION_PATCH 5)
 SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
 SET(libyarn_VERSION_API 1)
 SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/Exception.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.cpp b/depends/libyarn/src/common/Exception.cpp
index e0f084e..0a07e07 100644
--- a/depends/libyarn/src/common/Exception.cpp
+++ b/depends/libyarn/src/common/Exception.cpp
@@ -47,7 +47,7 @@ const char * UnsupportedOperationException::ReflexName =
 const char * ReplicaNotFoundException::ReflexName =
     "org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException";
 
-const char * NameNodeStandbyException::ReflexName =
+const char * ResourceManagerStandbyException::ReflexName =
     "org.apache.hadoop.ipc.StandbyException";
 
 const char * YarnInvalidBlockToken::ReflexName =

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/Exception.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.h b/depends/libyarn/src/common/Exception.h
index a40b185..e804194 100644
--- a/depends/libyarn/src/common/Exception.h
+++ b/depends/libyarn/src/common/Exception.h
@@ -4,8 +4,8 @@
  *
  * Author: Zhanwei Wang
  ********************************************************************/
-#ifndef _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_
-#define _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_
+#ifndef _YARN_LIBYARN_COMMON_EXCEPTION_H_
+#define _YARN_LIBYARN_COMMON_EXCEPTION_H_
 
 #include <stdexcept>
 #include <string>
@@ -42,6 +42,17 @@ public:
     static const char * ReflexName;
 };
 
+class YarnResourceManagerClosed: public YarnException {
+public:
+	YarnResourceManagerClosed(const std::string & arg, const char * file, int line,
+                         const char * stack) :
+        YarnException(arg, file, line, stack) {
+    }
+
+    ~YarnResourceManagerClosed() throw () {
+    }
+};
+
 class YarnNetworkException: public YarnIOException {
 public:
     YarnNetworkException(const std::string & arg, const char * file, int line,
@@ -456,14 +467,14 @@ public:
     static const char * ReflexName;
 };
 
-class NameNodeStandbyException: public YarnException {
+class ResourceManagerStandbyException: public YarnException {
 public:
-    NameNodeStandbyException(const std::string & arg, const char * file,
+	ResourceManagerStandbyException(const std::string & arg, const char * file,
                              int line, const char * stack) :
         YarnException(arg, file, line, stack) {
     }
 
-    ~NameNodeStandbyException() throw () {
+    ~ResourceManagerStandbyException() throw () {
     }
 
 public:
@@ -473,4 +484,4 @@ public:
 
 }
 
-#endif /* _HDFS_LIBHDFS3_COMMON_EXCEPTION_H_ */
+#endif /* _YARN_LIBYARN_COMMON_EXCEPTION_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/common/XmlConfig.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/XmlConfig.h b/depends/libyarn/src/common/XmlConfig.h
index 3d04c56..2a3cb2c 100644
--- a/depends/libyarn/src/common/XmlConfig.h
+++ b/depends/libyarn/src/common/XmlConfig.h
@@ -42,7 +42,7 @@ public:
      * Get a string with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     const char * getString(const char * key) const;
 
@@ -59,7 +59,7 @@ public:
      * Get a string with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     const char * getString(const std::string & key) const;
 
@@ -77,7 +77,7 @@ public:
      * Get a 64 bit integer with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     int64_t getInt64(const char * key) const;
 
@@ -94,7 +94,7 @@ public:
      * Get a 32 bit integer with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     int32_t getInt32(const char * key) const;
 
@@ -111,7 +111,7 @@ public:
      * Get a double with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     double getDouble(const char * key) const;
 
@@ -128,7 +128,7 @@ public:
      * Get a boolean with given configure key.
      * @param key The key of the configure item.
      * @return The value of configure item.
-     * @throw HdfsConfigNotFound
+     * @throw YarnConfigNotFound
      */
     bool getBool(const char * key) const;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
index cc72c5b..fd4b11f 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
@@ -4,29 +4,145 @@
 #include "rpc/RpcAuth.h"
 #include "common/XmlConfig.h"
 #include "common/SessionConfig.h"
-
+#include "Exception.h"
+#include "ExceptionInternal.h"
 #include "ApplicationClient.h"
+#include "StringUtil.h"
 
 namespace libyarn {
 
-ApplicationClient::ApplicationClient(string &user, string &host, string &port) {
-	std::string tokenService = "";
-	Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
-	Yarn::Internal::SessionConfig sessionConfig(*conf);
-	LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str());
-	appClient = (void*) new ApplicationClientProtocol(user, host, port, tokenService, sessionConfig);
+RMInfo::RMInfo() {
 }
 
-ApplicationClient::ApplicationClient(ApplicationClientProtocol *appclient){
-	appClient = (void*)appclient;
+const char * YARN_RESOURCEMANAGER_HA = "yarn.resourcemanager.ha";
+
+std::vector<RMInfo> RMInfo::getHARMInfo(const Yarn::Config & conf, const char* name) {
+    std::vector<RMInfo> retval;
+    /*
+    * Read config and create a vector of RM address.
+    */
+    try{
+        std::string strHA = StringTrim(conf.getString(std::string(name)));
+        std::vector<std::string> strRMs = StringSplit(strHA, ",");
+        retval.resize(strRMs.size());
+        for (size_t i = 0; i < strRMs.size(); ++i) {
+            std::vector<std::string> rm = StringSplit(strRMs[i], ":");
+            retval[i].setHost(rm[0]);
+            retval[i].setPort(rm[1]);
+        }
+    } catch (const Yarn::YarnConfigNotFound &e) {
+        LOG(INFO, "Yarn RM HA is not configured.");
+    }
+
+return retval;
+}
+
+ApplicationClient::ApplicationClient(string &user, string &host, string &port) {
+    std::string tokenService = "";
+    Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
+    Yarn::Internal::SessionConfig sessionConfig(*conf);
+    LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str());
+
+    std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_HA);
+
+    if (rmInfos.size() <= 1) {
+        LOG(INFO, "ApplicationClient Resource Manager HA is disable.");
+        enableRMHA = false;
+        maxRMHARetry = 0;
+    } else {
+        LOG(INFO, "ApplicationClient Resource Manager HA is enable. Number of RM: %d", rmInfos.size());
+        enableRMHA = true;
+        maxRMHARetry = sessionConfig.getRpcMaxHaRetry();
+    }
+
+    if (!enableRMHA)
+    {
+        appClientProtos.push_back(
+            std::shared_ptr<ApplicationClientProtocol>(
+                new ApplicationClientProtocol(user, host, port, tokenService, sessionConfig)));
+    } else {
+        /*
+        * iterate RMInfo vector and create 1-1 applicationClientProtocol for each standby RM
+        */
+        for (size_t i = 0; i < rmInfos.size(); ++i) {
+            appClientProtos.push_back(
+                std::shared_ptr<ApplicationClientProtocol>(
+                    new ApplicationClientProtocol(
+                        user, rmInfos[i].getHost(),rmInfos[i].getPort(), tokenService, sessionConfig)));
+            LOG(INFO, "ApplicationClient finds a standby RM, host:%s, port:%s",
+                      rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str());
+        }
+    }
+    currentAppClientProto = 0;
 }
 
 ApplicationClient::~ApplicationClient() {
-	if (appClient != NULL){
-		delete (ApplicationClientProtocol*) appClient;
-	}
 }
 
+std::shared_ptr<ApplicationClientProtocol>
+    ApplicationClient::getActiveAppClientProto(uint32_t & oldValue) {
+    lock_guard<mutex> lock(this->mut);
+
+    LOG(INFO, "ApplicationClient::getActiveAppClientProto is called.");
+
+    if (appClientProtos.empty()) {
+        LOG(WARNING, "The vector of ApplicationClientProtocol is empty.");
+        THROW(Yarn::YarnResourceManagerClosed, "ApplicationClientProtocol is closed.");
+    }
+
+    oldValue = currentAppClientProto;
+    LOG(INFO, "ApplicationClient::getActiveAppClientProto, current is %d.", currentAppClientProto);
+    return appClientProtos[currentAppClientProto % appClientProtos.size()];
+}
+
+void ApplicationClient::failoverToNextAppClientProto(uint32_t oldValue){
+    lock_guard<mutex> lock(mut);
+
+    if (oldValue != currentAppClientProto || appClientProtos.size() == 1) {
+        return;
+    }
+
+    ++currentAppClientProto;
+    currentAppClientProto = currentAppClientProto % appClientProtos.size();
+    LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is %d.", currentAppClientProto);
+}
+
+static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) {
+    try {
+        rethrow_if_nested(e);
+    } catch (...) {
+        NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
+    }
+
+    //should not reach here
+    abort();
+}
+
+
+#define RESOURCEMANAGER_HA_RETRY_BEGIN() \
+    do { \
+        int __count = 0; \
+        do { \
+            uint32_t __oldValue = 0; \
+            std::shared_ptr<ApplicationClientProtocol> appClientProto = getActiveAppClientProto(__oldValue); \
+            try { \
+                (void)0
+
+#define RESOURCEMANAGER_HA_RETRY_END() \
+    break; \
+    } catch (const Yarn::ResourceManagerStandbyException & e) { \
+        if (!enableRMHA || __count++ > maxRMHARetry) { \
+            throw; \
+        } \
+    } catch (const Yarn::YarnFailoverException & e) { \
+        if (!enableRMHA || __count++ > maxRMHARetry) { \
+            HandleYarnFailoverException(e); \
+        } \
+    } \
+    failoverToNextAppClientProto(__oldValue); \
+    } while (true); \
+    } while (0)
+
 /*
  rpc getNewApplication (GetNewApplicationRequestProto) returns (GetNewApplicationResponseProto);
 
@@ -39,11 +155,13 @@ ApplicationClient::~ApplicationClient() {
  }
  */
 ApplicationID ApplicationClient::getNewApplication() {
-	ApplicationClientProtocol* appClientAlias =
-			((ApplicationClientProtocol*) appClient);
-	GetNewApplicationRequest request;
-	GetNewApplicationResponse response = appClientAlias->getNewApplication(request);
-	return response.getApplicationId();
+    GetNewApplicationRequest request;
+    GetNewApplicationResponse response;
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getNewApplication(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+    return response.getApplicationId();
 }
 
 /*
@@ -58,12 +176,13 @@ ApplicationID ApplicationClient::getNewApplication() {
  */
 
 void ApplicationClient::submitApplication(
-		ApplicationSubmissionContext &appContext) {
-	ApplicationClientProtocol* appClientAlias =
-			((ApplicationClientProtocol*) appClient);
-	SubmitApplicationRequest request;
-	request.setApplicationSubmissionContext(appContext);
-	appClientAlias->submitApplication(request);
+        ApplicationSubmissionContext &appContext) {
+    SubmitApplicationRequest request;
+    request.setApplicationSubmissionContext(appContext);
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    appClientProto->submitApplication(request);
+    RESOURCEMANAGER_HA_RETRY_END();
 }
 
 /*
@@ -79,85 +198,102 @@ void ApplicationClient::submitApplication(
  */
 
 ApplicationReport ApplicationClient::getApplicationReport(
-		ApplicationID &appId) {
-	ApplicationClientProtocol* appClientAlias =
-			((ApplicationClientProtocol*) appClient);
-	GetApplicationReportRequest request;
-	request.setApplicationId(appId);
-	GetApplicationReportResponse response =
-			appClientAlias->getApplicationReport(request);
-	/*ApplicationReport report = response.getApplicationReport();
-	if (report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) {
-			Token token = report.getAMRMToken();
-			LOG(INFO,"%s",report.getClientToAMToken().getIdentifier());
-	}*/
-	return response.getApplicationReport();
+        ApplicationID &appId) {
+    GetApplicationReportRequest request;
+    GetApplicationReportResponse response;
+
+    request.setApplicationId(appId);
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getApplicationReport(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getApplicationReport();
 }
 
 list<ContainerReport> ApplicationClient::getContainers(ApplicationAttemptId &appAttempId){
-	ApplicationClientProtocol* appClientAlias =
-			((ApplicationClientProtocol*) appClient);
-	GetContainersRequest request;
-	request.setApplicationAttemptId(appAttempId);
-	/*LOG(INFO,
-			"ApplicationClient::getContainers, appId[cluster_timestamp:%lld,id:%d]",
-			request.getApplicationId().getClusterTimestamp(), request.getApplicationId().getId());
-	*/
-	GetContainersResponse response =
-			appClientAlias->getContainers(request);
-	return response.getcontainersReportList();
+    GetContainersRequest request;
+    GetContainersResponse response;
+
+    request.setApplicationAttemptId(appAttempId);
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getContainers(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getcontainersReportList();
 }
 
 list<NodeReport> ApplicationClient::getClusterNodes(list<NodeState> &states) {
-	GetClusterNodesRequest request;
-	request.setNodeStates(states);
-	GetClusterNodesResponse response =
-			((ApplicationClientProtocol*) appClient)->getClusterNodes(request);
-	return response.getNodeReports();
+    GetClusterNodesRequest request;
+    GetClusterNodesResponse response;
+    request.setNodeStates(states);
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getClusterNodes(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getNodeReports();
 }
 
 QueueInfo ApplicationClient::getQueueInfo(string &queue, bool includeApps,
-		bool includeChildQueues, bool recursive) {
-	GetQueueInfoRequest request;
-	request.setQueueName(queue);
-	request.setIncludeApplications(includeApps);
-	request.setIncludeChildQueues(includeChildQueues);
-	request.setRecursive(recursive);
-	GetQueueInfoResponse response =
-			((ApplicationClientProtocol*) appClient)->getQueueInfo(request);
-	return response.getQueueInfo();
+        bool includeChildQueues, bool recursive) {
+    GetQueueInfoRequest request;
+    GetQueueInfoResponse response;
+    request.setQueueName(queue);
+    request.setIncludeApplications(includeApps);
+    request.setIncludeChildQueues(includeChildQueues);
+    request.setRecursive(recursive);
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getQueueInfo(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getQueueInfo();
 }
 
 void ApplicationClient::forceKillApplication(ApplicationID &appId) {
-	KillApplicationRequest request;
-	request.setApplicationId(appId);
-	((ApplicationClientProtocol*) appClient)->forceKillApplication(request);
+    KillApplicationRequest request;
+    request.setApplicationId(appId);
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    appClientProto->forceKillApplication(request);
+    RESOURCEMANAGER_HA_RETRY_END();
 }
 
 YarnClusterMetrics ApplicationClient::getClusterMetrics() {
-	GetClusterMetricsRequest request;
-	GetClusterMetricsResponse response =
-			((ApplicationClientProtocol*) appClient)->getClusterMetrics(
-					request);
-	return response.getClusterMetrics();
+    GetClusterMetricsRequest request;
+    GetClusterMetricsResponse response;
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getClusterMetrics(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getClusterMetrics();
 }
 
 list<ApplicationReport> ApplicationClient::getApplications(
-		list<string> &applicationTypes,
-		list<YarnApplicationState> &applicationStates) {
-	GetApplicationsRequest request;
-	request.setApplicationStates(applicationStates);
-	request.setApplicationTypes(applicationTypes);
-	GetApplicationsResponse response =
-			((ApplicationClientProtocol*) appClient)->getApplications(request);
-	return response.getApplicationList();
+        list<string> &applicationTypes,
+        list<YarnApplicationState> &applicationStates) {
+    GetApplicationsRequest request;
+    GetApplicationsResponse response;
+    request.setApplicationStates(applicationStates);
+    request.setApplicationTypes(applicationTypes);
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getApplications(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getApplicationList();
 }
 
 list<QueueUserACLInfo> ApplicationClient::getQueueAclsInfo() {
-	GetQueueUserAclsInfoRequest request;
-	GetQueueUserAclsInfoResponse response =
-			((ApplicationClientProtocol*) appClient)->getQueueAclsInfo(request);
-	return response.getUserAclsInfoList();
+    GetQueueUserAclsInfoRequest request;
+    GetQueueUserAclsInfoResponse response;
+
+    RESOURCEMANAGER_HA_RETRY_BEGIN();
+    response = appClientProto->getQueueAclsInfo(request);
+    RESOURCEMANAGER_HA_RETRY_END();
+
+    return response.getUserAclsInfoList();
 }
 
 } /* namespace libyarn */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.h b/depends/libyarn/src/libyarnclient/ApplicationClient.h
index ae81ea0..49aacb7 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationClient.h
+++ b/depends/libyarn/src/libyarnclient/ApplicationClient.h
@@ -10,6 +10,7 @@
 #include "records/ApplicationSubmissionContext.h"
 #include "records/YarnClusterMetrics.h"
 #include "records/QueueUserACLInfo.h"
+#include "Thread.h"
 
 using namespace std;
 
@@ -64,44 +65,85 @@ private:
     Yarn::Internal::shared_ptr<Yarn::Config> conf;
 };
 
+class RMInfo {
+
+public:
+	RMInfo();
+
+	RMInfo(string &rmHost, string &rmPort) : host(rmHost), port(rmPort){};
+
+    const std::string & getHost() const {
+        return host;
+    }
+
+    void setHost(const std::string & rmHost) {
+        host = rmHost;
+    }
+
+    const std::string & getPort() const {
+        return port;
+    }
+
+    void setPort(const std::string & rmPort) {
+        port = rmPort;
+    }
+
+	static std::vector<RMInfo> getHARMInfo(const Yarn::Config & conf, const char* name);
+
+private:
+	std::string host;
+	std::string port;
+};
+
 class ApplicationClient {
 public:
-	ApplicationClient(string &user, string &host, string &port);
+    ApplicationClient(string &user, string &host, string &port);
 
-	ApplicationClient(ApplicationClientProtocol *appclient);
+    virtual ~ApplicationClient();
 
-	virtual ~ApplicationClient();
+    virtual ApplicationID getNewApplication();
 
-	virtual ApplicationID getNewApplication();
+    virtual void submitApplication(ApplicationSubmissionContext &appContext);
 
-	virtual void submitApplication(ApplicationSubmissionContext &appContext);
+    virtual ApplicationReport getApplicationReport(ApplicationID &appId);
 
-	virtual ApplicationReport getApplicationReport(ApplicationID &appId);
+    virtual list<ContainerReport> getContainers(ApplicationAttemptId &appAttempId);
 
-	virtual list<ContainerReport> getContainers(ApplicationAttemptId &appAttempId);
+    virtual list<NodeReport> getClusterNodes(list<NodeState> &state);
 
-	virtual list<NodeReport> getClusterNodes(list<NodeState> &state);
+    virtual QueueInfo getQueueInfo(string &queue, bool includeApps,
+            bool includeChildQueues, bool recursive);
 
-	virtual QueueInfo getQueueInfo(string &queue, bool includeApps,
-			bool includeChildQueues, bool recursive);
+    virtual void forceKillApplication(ApplicationID &appId);
 
-	virtual void forceKillApplication(ApplicationID &appId);
+    virtual YarnClusterMetrics getClusterMetrics();
 
-	virtual YarnClusterMetrics getClusterMetrics();
+    virtual list<ApplicationReport> getApplications(list<string> &applicationTypes,
+            list<YarnApplicationState> &applicationStates);
 
-	virtual list<ApplicationReport> getApplications(list<string> &applicationTypes,
-			list<YarnApplicationState> &applicationStates);
+    virtual list<QueueUserACLInfo> getQueueAclsInfo();
 
-	virtual list<QueueUserACLInfo> getQueueAclsInfo();
+    const std::string & getUser(){uint32_t old=0; return getActiveAppClientProto(old)->getUser();};
 
-    const std::string & getUser() const {return ((ApplicationClientProtocol*)appClient)->getUser();};
+    const AuthMethod getMethod(){uint32_t old=0; return getActiveAppClientProto(old)->getMethod();};
 
-    AuthMethod getMethod() const {return ((ApplicationClientProtocol*)appClient)->getMethod();};
+    const std::string getPrincipal(){uint32_t old=0; return getActiveAppClientProto(old)->getPrincipal();};
 
-    const std::string getPrincipal() const {return ((ApplicationClientProtocol*)appClient)->getPrincipal();};
+private:
+    std::shared_ptr<ApplicationClientProtocol> getActiveAppClientProto(uint32_t & oldValue);
+    void failoverToNextAppClientProto(uint32_t oldValue);
 
 private:
-	void *appClient;
+    bool enableRMHA;
+    int maxRMHARetry;
+    mutex mut;
+    /**
+     * Each ApplicationClientProto object stands for a connection to a standby resource manager.
+     * If application client fail in connecting the active resource manager, it will try the
+     * next one in the list.
+     */
+    std::vector<std::shared_ptr<ApplicationClientProtocol>> appClientProtos;
+    uint32_t currentAppClientProto;
 };
 
 } /* namespace libyarn */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
index 0d26082..d7c4dc6 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
@@ -8,24 +8,113 @@
 #include "ApplicationClient.h"
 
 namespace libyarn {
+
+const char * YARN_RESOURCEMANAGER_SCHEDULER_HA = "yarn.resourcemanager.scheduler.ha";
+
 ApplicationMaster::ApplicationMaster(string &schedHost, string &schedPort,
-		UserInfo &user, const string &tokenService) {
-	Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
-	Yarn::Internal::SessionConfig sessionConfig(*conf);
-	RpcAuth rpcAuth(user, AuthMethod::TOKEN);
-	rmClient = (void*) new ApplicationMasterProtocol(schedHost,
-			schedPort, tokenService, sessionConfig, rpcAuth);
+        UserInfo &user, const string &tokenService) {
+    Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig();
+    Yarn::Internal::SessionConfig sessionConfig(*conf);
+    RpcAuth rpcAuth(user, AuthMethod::TOKEN);
+
+    std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_SCHEDULER_HA);
+
+    if (rmInfos.size() <= 1) {
+        LOG(INFO, "ApplicationClient RM Scheduler HA is disable.");
+        enableRMSchedulerHA = false;
+        maxRMHARetry = 0;
+    } else {
+        LOG(INFO, "ApplicationClient RM Scheduler HA is enable. Number of RM scheduler: %d", rmInfos.size());
+        enableRMSchedulerHA = true;
+        maxRMHARetry = sessionConfig.getRpcMaxHaRetry();
+    }
+
+    if (!enableRMSchedulerHA)
+    {
+        appMasterProtos.push_back(
+            std::shared_ptr<ApplicationMasterProtocol>(
+                new ApplicationMasterProtocol(schedHost, schedPort, tokenService, sessionConfig, rpcAuth)));
+    }
+    else {
+        /*
+         * iterate RMInfo vector and create 1-1 applicationMasterProtocol for each standby RM scheduler.
+         */
+        for (size_t i = 0; i < rmInfos.size(); ++i) {
+            appMasterProtos.push_back(
+                std::shared_ptr<ApplicationMasterProtocol>(
+                    new ApplicationMasterProtocol(rmInfos[i].getHost(),
+                        rmInfos[i].getPort(), tokenService, sessionConfig, rpcAuth)));
+            LOG(INFO, "ApplicationMaster finds a standby RM scheduler, host:%s, port:%s",
+                      rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str());
+        }
+    }
+    currentAppMasterProto = 0;
 }
 
-ApplicationMaster::ApplicationMaster(ApplicationMasterProtocol *rmclient){
-	rmClient = (void*)rmclient;
+ApplicationMaster::~ApplicationMaster() {
 }
 
+std::shared_ptr<ApplicationMasterProtocol>
+    ApplicationMaster::getActiveAppMasterProto(uint32_t & oldValue) {
+    lock_guard<mutex> lock(this->mut);
 
-ApplicationMaster::~ApplicationMaster() {
-	delete (ApplicationMasterProtocol*) rmClient;
+    if (appMasterProtos.empty()) {
+        LOG(WARNING, "The vector of ApplicationMasterProtocol is empty.");
+        THROW(Yarn::YarnResourceManagerClosed, "ApplicationMasterProtocol is closed.");
+    }
+
+    oldValue = currentAppMasterProto;
+    LOG(INFO, "ApplicationMaster::getActiveAppMasterProto, current is %d.", currentAppMasterProto);
+    return appMasterProtos[currentAppMasterProto % appMasterProtos.size()];
+}
+
+void ApplicationMaster::failoverToNextAppMasterProto(uint32_t oldValue){
+    lock_guard<mutex> lock(mut);
+
+    if (oldValue != currentAppMasterProto || appMasterProtos.size() == 1) {
+        return;
+    }
+
+    ++currentAppMasterProto;
+    currentAppMasterProto = currentAppMasterProto % appMasterProtos.size();
+    LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is %d.", currentAppMasterProto);
+}
+
+static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e) {
+    try {
+        rethrow_if_nested(e);
+    } catch (...) {
+        NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
+    }
+
+    //should not reach here
+    abort();
 }
 
+#define RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN() \
+    do { \
+        int __count = 0; \
+        do { \
+            uint32_t __oldValue = 0; \
+            std::shared_ptr<ApplicationMasterProtocol> appMasterProto = getActiveAppMasterProto(__oldValue); \
+            try { \
+                (void)0
+
+#define RESOURCEMANAGER_SCHEDULER_HA_RETRY_END() \
+    break; \
+    } catch (const Yarn::ResourceManagerStandbyException & e) { \
+        if (!enableRMSchedulerHA || __count++ > maxRMHARetry) { \
+            throw; \
+        } \
+    } catch (const Yarn::YarnFailoverException & e) { \
+        if (!enableRMSchedulerHA || __count++ > maxRMHARetry) { \
+            HandleYarnFailoverException(e); \
+        } \
+    } \
+    failoverToNextAppMasterProto(__oldValue); \
+    } while (true); \
+    } while (0)
+
 /*
 rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
 
@@ -42,13 +131,17 @@ message RegisterApplicationMasterResponseProto {
 }
  */
 RegisterApplicationMasterResponse ApplicationMaster::registerApplicationMaster(
-		string &amHost, int32_t amPort, string &am_tracking_url) {
-	RegisterApplicationMasterRequest request;
-	request.setHost(amHost);
-	request.setRpcPort(amPort);
-	request.setTrackingUrl(am_tracking_url);
-	ApplicationMasterProtocol* rmClientAlias = (ApplicationMasterProtocol*) rmClient;
-	return rmClientAlias->registerApplicationMaster(request);
+    string &amHost, int32_t amPort, string &am_tracking_url) {
+    RegisterApplicationMasterRequest request;
+    RegisterApplicationMasterResponse response;
+    request.setHost(amHost);
+    request.setRpcPort(amPort);
+    request.setTrackingUrl(am_tracking_url);
+
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+    response = appMasterProto->registerApplicationMaster(request);
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+    return response;
 }
 
 /*
@@ -74,16 +167,20 @@ message AllocateResponseProto {
 */
 
 AllocateResponse ApplicationMaster::allocate(list<ResourceRequest> &asks,
-		list<ContainerId> &releases, ResourceBlacklistRequest &blacklistRequest,
-		int32_t responseId, float progress) {
-	AllocateRequest request;
-	request.setAsks(asks);
-	request.setReleases(releases);
-	request.setBlacklistRequest(blacklistRequest);
-	request.setResponseId(responseId);
-	request.setProgress(progress);
-
-	return ((ApplicationMasterProtocol*) rmClient)->allocate(request);
+        list<ContainerId> &releases, ResourceBlacklistRequest &blacklistRequest,
+        int32_t responseId, float progress) {
+    AllocateRequest request;
+    AllocateResponse response;
+    request.setAsks(asks);
+    request.setReleases(releases);
+    request.setBlacklistRequest(blacklistRequest);
+    request.setResponseId(responseId);
+    request.setProgress(progress);
+
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+    response = appMasterProto->allocate(request);
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+    return response;
 }
 
 /*
@@ -101,17 +198,17 @@ message FinishApplicationMasterResponseProto {
 */
 
 bool ApplicationMaster::finishApplicationMaster(string &diagnostics,
-		string &trackingUrl, FinalApplicationStatus finalstatus) {
-	ApplicationMasterProtocol* rmClientAlias = (ApplicationMasterProtocol*) rmClient;
-
-	FinishApplicationMasterRequest request;
-	request.setDiagnostics(diagnostics);
-	request.setTrackingUrl(trackingUrl);
-	request.setFinalApplicationStatus(finalstatus);
-
-	FinishApplicationMasterResponse response = rmClientAlias->finishApplicationMaster(request);
-
-	return response.getIsUnregistered();
+        string &trackingUrl, FinalApplicationStatus finalstatus) {
+    FinishApplicationMasterRequest request;
+    FinishApplicationMasterResponse response;
+    request.setDiagnostics(diagnostics);
+    request.setTrackingUrl(trackingUrl);
+    request.setFinalApplicationStatus(finalstatus);
+
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_BEGIN();
+    response = appMasterProto->finishApplicationMaster(request);
+    RESOURCEMANAGER_SCHEDULER_HA_RETRY_END();
+    return response.getIsUnregistered();
 }
 
 } /* namespace libyarn */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnclient/ApplicationMaster.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.h b/depends/libyarn/src/libyarnclient/ApplicationMaster.h
index 7df1e46..999c146 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationMaster.h
+++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.h
@@ -32,26 +32,36 @@ namespace libyarn {
 
 class ApplicationMaster {
 public:
-	ApplicationMaster(string &schedHost, string &schedPort,
-			UserInfo &user, const string &tokenService);
+    ApplicationMaster(string &schedHost, string &schedPort,
+            UserInfo &user, const string &tokenService);
 
-	ApplicationMaster(ApplicationMasterProtocol *rmclient);
+    virtual ~ApplicationMaster();
 
-	virtual ~ApplicationMaster();
+    virtual RegisterApplicationMasterResponse registerApplicationMaster(string &amHost,
+            int32_t amPort, string &am_tracking_url);
 
-	virtual RegisterApplicationMasterResponse registerApplicationMaster(string &amHost,
-			int32_t amPort, string &am_tracking_url);
+    virtual AllocateResponse allocate(list<ResourceRequest> &asks,
+            list<ContainerId> &releases,
+            ResourceBlacklistRequest &blacklistRequest, int32_t responseId,
+            float progress);
 
-	virtual AllocateResponse allocate(list<ResourceRequest> &asks,
-			list<ContainerId> &releases,
-			ResourceBlacklistRequest &blacklistRequest, int32_t responseId,
-			float progress);
-
-	virtual bool finishApplicationMaster(string &diagnostics, string &trackingUrl,
-			FinalApplicationStatus finalstatus);
+    virtual bool finishApplicationMaster(string &diagnostics, string &trackingUrl,
+            FinalApplicationStatus finalstatus);
+private:
+    std::shared_ptr<ApplicationMasterProtocol> getActiveAppMasterProto(uint32_t & oldValue);
+    void failoverToNextAppMasterProto(uint32_t oldValue);
 
 private:
-	void *rmClient;
+    bool enableRMSchedulerHA;
+    int maxRMHARetry;
+    mutex mut;
+    /**
+     * Each ApplicationMasterProto object stands for a connection to a standby RM scheduler.
+     * If application master fail in connecting the active RM scheduler, it will try the
+     * next one in the list.
+     */
+    std::vector<std::shared_ptr<ApplicationMasterProtocol>> appMasterProtos;
+    uint32_t currentAppMasterProto;
 };
 
 } /* namespace libyarn */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
index 71bc3e9..620d48f 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
@@ -16,43 +16,43 @@ using namespace Yarn;
 namespace libyarn {
 
 ApplicationClientProtocol::ApplicationClientProtocol(const string &rmUser,
-			const string & rmHost, const string & rmPort,
-			const string & tokenService,const SessionConfig & c) :
-			client(RpcClient::getClient()), conf(c),
-			protocol(APP_CLIENT_PROTOCOL_VERSION, APP_CLIENT_PROTOCOL,APP_CLIENT_DELEGATION_TOKEN_KIND),
-			server(tokenService, rmHost, rmPort) {
-
-	/* create RpcAuth for rpc method,
-	 * can be SIMPLE or KERBEROS
-	 * */
-	if (RpcAuth::ParseMethod(c.getRpcAuthMethod()) == KERBEROS) {
-		/*
-		 * If using KERBEROS, rmUser should be principal name.
-		 */
-		Yarn::Internal::UserInfo user(rmUser);
-		user.setRealUser(user.getEffectiveUser());
-		Yarn::Internal::RpcAuth rpcAuth(user, KERBEROS);
-		auth = rpcAuth;
-	} else {
-		Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser();
-		Yarn::Internal::RpcAuth rpcAuth(user, SIMPLE);
-		auth = rpcAuth;
-	}
+            const string & rmHost, const string & rmPort,
+            const string & tokenService,const SessionConfig & c) :
+            client(RpcClient::getClient()), conf(c),
+            protocol(APP_CLIENT_PROTOCOL_VERSION, APP_CLIENT_PROTOCOL,APP_CLIENT_DELEGATION_TOKEN_KIND),
+            server(tokenService, rmHost, rmPort) {
+
+    /* create RpcAuth for rpc method,
+     * can be SIMPLE or KERBEROS
+     */
+    if (RpcAuth::ParseMethod(c.getRpcAuthMethod()) == KERBEROS) {
+        /*
+         * If using KERBEROS, rmUser should be principal name.
+         */
+        Yarn::Internal::UserInfo user(rmUser);
+        user.setRealUser(user.getEffectiveUser());
+        Yarn::Internal::RpcAuth rpcAuth(user, KERBEROS);
+        auth = rpcAuth;
+    } else {
+        Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser();
+        Yarn::Internal::RpcAuth rpcAuth(user, SIMPLE);
+        auth = rpcAuth;
+    }
 }
 
 ApplicationClientProtocol::~ApplicationClientProtocol() {
 }
 
 void ApplicationClientProtocol::invoke(const RpcCall & call) {
-	try {
-		channel = &client.getChannel(auth, protocol, server, conf);
-		channel->invoke(call);
-		channel->close(false);
-	}
-	catch (...) {
-		channel->close(false);
-		throw;
-	}
+    try {
+        channel = &client.getChannel(auth, protocol, server, conf);
+        channel->invoke(call);
+        channel->close(false);
+    }
+    catch (...) {
+        channel->close(false);
+        throw;
+    }
 }
 
 /*
@@ -68,21 +68,23 @@ void ApplicationClientProtocol::invoke(const RpcCall & call) {
  */
 
 GetNewApplicationResponse ApplicationClientProtocol::getNewApplication(
-		GetNewApplicationRequest &request) {
-	try {
-		GetNewApplicationResponseProto responseProto;
-		GetNewApplicationRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getNewApplication", &requestProto, &responseProto));
-		return GetNewApplicationResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getNewApplication in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetNewApplicationRequest &request) {
+    try {
+        GetNewApplicationResponseProto responseProto;
+        GetNewApplicationRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getNewApplication", &requestProto, &responseProto));
+        return GetNewApplicationResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getNewApplication in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 /*
@@ -97,20 +99,22 @@ GetNewApplicationResponse ApplicationClientProtocol::getNewApplication(
  */
 
 void ApplicationClientProtocol::submitApplication(
-		SubmitApplicationRequest &request) {
-	try {
-		SubmitApplicationResponseProto responseProto;
-		SubmitApplicationRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "submitApplication", &requestProto, &responseProto));
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::submitApplication in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        SubmitApplicationRequest &request) {
+    try {
+        SubmitApplicationResponseProto responseProto;
+        SubmitApplicationRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "submitApplication", &requestProto, &responseProto));
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::submitApplication in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 /*
  rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
@@ -124,22 +128,24 @@ void ApplicationClientProtocol::submitApplication(
  }
  */
 GetApplicationReportResponse ApplicationClientProtocol::getApplicationReport(
-		GetApplicationReportRequest &request) {
-	try {
-		GetApplicationReportResponseProto responseProto;
-		GetApplicationReportRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getApplicationReport", &requestProto, &responseProto));
-		return GetApplicationReportResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getApplicationReport in %s: %d",
-			  __FILE__, __LINE__);
-	}
-}
+        GetApplicationReportRequest &request) {
+    try {
+        GetApplicationReportResponseProto responseProto;
+        GetApplicationReportRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getApplicationReport", &requestProto, &responseProto));
+        return GetApplicationReportResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getApplicationReport in %s: %d",
+              __FILE__, __LINE__);
+    }
+    }
 
 /*
  rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
@@ -153,20 +159,22 @@ message GetContainersResponseProto {
 }
  */
 GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequest &request){
-	try {
-		GetContainersResponseProto responseProto;
-		GetContainersRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getContainers", &requestProto,&responseProto));
-		return GetContainersResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getContainers in %s: %d",
-			  __FILE__, __LINE__);
-	}
+    try {
+        GetContainersResponseProto responseProto;
+        GetContainersRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getContainers", &requestProto,&responseProto));
+        return GetContainersResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getContainers in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 /*
@@ -181,21 +189,23 @@ GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequ
  }
  */
 GetClusterNodesResponse ApplicationClientProtocol::getClusterNodes(
-		GetClusterNodesRequest &request) {
-	try {
-		GetClusterNodesResponseProto responseProto;
-		GetClusterNodesRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getClusterNodes", &requestProto, &responseProto));
-		return GetClusterNodesResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getClusterNodes in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetClusterNodesRequest &request) {
+    try {
+        GetClusterNodesResponseProto responseProto;
+        GetClusterNodesRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getClusterNodes", &requestProto, &responseProto));
+        return GetClusterNodesResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getClusterNodes in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 /*
@@ -223,21 +233,23 @@ GetClusterNodesResponse ApplicationClientProtocol::getClusterNodes(
  }
  */
 GetQueueInfoResponse ApplicationClientProtocol::getQueueInfo(
-		GetQueueInfoRequest &request) {
-	try {
-		GetQueueInfoResponseProto responseProto;
-		GetQueueInfoRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getQueueInfo", &requestProto, &responseProto));
-		return GetQueueInfoResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getQueueInfo in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetQueueInfoRequest &request) {
+    try {
+        GetQueueInfoResponseProto responseProto;
+        GetQueueInfoRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getQueueInfo", &requestProto, &responseProto));
+        return GetQueueInfoResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getQueueInfo in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 //rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
@@ -246,81 +258,85 @@ GetQueueInfoResponse ApplicationClientProtocol::getQueueInfo(
 //}
 
 GetClusterMetricsResponse ApplicationClientProtocol::getClusterMetrics(
-		GetClusterMetricsRequest &request) {
-	try {
-		GetClusterMetricsResponseProto responseProto;
-		GetClusterMetricsRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getClusterMetrics", &requestProto, &responseProto));
-		return GetClusterMetricsResponse(responseProto);
-	}
-	catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getClusterMetrics in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetClusterMetricsRequest &request) {
+    try {
+        GetClusterMetricsResponseProto responseProto;
+        GetClusterMetricsRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getClusterMetrics", &requestProto, &responseProto));
+        return GetClusterMetricsResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getClusterMetrics in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 KillApplicationResponse ApplicationClientProtocol::forceKillApplication(
-		KillApplicationRequest &request) {
-	try {
-		KillApplicationResponseProto responseProto;
-		KillApplicationRequestProto requestProto = request.getProto();
-		invoke(
-				RpcCall(true, "forceKillApplication", &requestProto,
-						&responseProto));
-		return KillApplicationResponse(responseProto);
-	}
-	catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::forceKillApplication in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        KillApplicationRequest &request) {
+    try {
+        KillApplicationResponseProto responseProto;
+        KillApplicationRequestProto requestProto = request.getProto();
+        invoke(
+                RpcCall(true, "forceKillApplication", &requestProto,
+                        &responseProto));
+        return KillApplicationResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::forceKillApplication in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 GetApplicationsResponse ApplicationClientProtocol::getApplications(
-		GetApplicationsRequest &request) {
-	try {
-		GetApplicationsResponseProto responseProto;
-		GetApplicationsRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getApplications", &requestProto, &responseProto));
-		return GetApplicationsResponse(responseProto);
-	}
-	catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getApplications in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetApplicationsRequest &request) {
+    try {
+        GetApplicationsResponseProto responseProto;
+        GetApplicationsRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getApplications", &requestProto, &responseProto));
+        return GetApplicationsResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getApplications in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 GetQueueUserAclsInfoResponse ApplicationClientProtocol::getQueueAclsInfo(
-		GetQueueUserAclsInfoRequest &request) {
-	try {
-		GetQueueUserAclsInfoResponseProto responseProto;
-		GetQueueUserAclsInfoRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "getQueueUserAcls", &requestProto, &responseProto));
-		return GetQueueUserAclsInfoResponse(responseProto);
-	}
-	catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationClientProtocol::getQueueAclsInfo in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        GetQueueUserAclsInfoRequest &request) {
+    try {
+        GetQueueUserAclsInfoResponseProto responseProto;
+        GetQueueUserAclsInfoRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "getQueueUserAcls", &requestProto, &responseProto));
+        return GetQueueUserAclsInfoResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationClientProtocol::getQueueAclsInfo in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
index 6b1bf89..56b358f 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
@@ -19,39 +19,41 @@ using namespace Yarn;
 
 namespace libyarn {
 
-ApplicationMasterProtocol::ApplicationMasterProtocol(std::string & schedHost,
-		std::string & schedPort, const std::string & tokenService,
-		const SessionConfig & c, const RpcAuth & a) :
-		auth(a), client(RpcClient::getClient()), conf(c), protocol(
-		APPLICATION_MASTER_VERSION, APPLICATION_MASTER_PROTOCOL,
-		AMRM_TOKEN_KIND), server(tokenService, schedHost, schedPort) {
+ApplicationMasterProtocol::ApplicationMasterProtocol(const std::string & schedHost,
+        const std::string & schedPort, const std::string & tokenService,
+        const SessionConfig & c, const RpcAuth & a) :
+        auth(a), client(RpcClient::getClient()), conf(c), protocol(
+        APPLICATION_MASTER_VERSION, APPLICATION_MASTER_PROTOCOL,
+        AMRM_TOKEN_KIND), server(tokenService, schedHost, schedPort) {
 }
 
 ApplicationMasterProtocol::~ApplicationMasterProtocol() {
 }
 
 void ApplicationMasterProtocol::invoke(const RpcCall & call) {
-	try {
-		channel = &client.getChannel(auth, protocol, server, conf);
-		channel->invoke(call);
-		channel->close(false);
-	} catch (...) {
-		channel->close(false);
-		throw;
-	}
+    try {
+        channel = &client.getChannel(auth, protocol, server, conf);
+        channel->invoke(call);
+        channel->close(false);
+    } catch (...) {
+        channel->close(false);
+        throw;
+    }
 }
 
 RegisterApplicationMasterResponse ApplicationMasterProtocol::registerApplicationMaster(
-		RegisterApplicationMasterRequest &request) {
-	try {
-		RegisterApplicationMasterResponseProto responseProto;
-		RegisterApplicationMasterRequestProto requestProto = request.getProto();
-		invoke(RpcCall(true, "registerApplicationMaster", &requestProto, &responseProto));
-		return RegisterApplicationMasterResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	} catch (...) {
+        RegisterApplicationMasterRequest &request) {
+    try {
+        RegisterApplicationMasterResponseProto responseProto;
+        RegisterApplicationMasterRequestProto requestProto = request.getProto();
+        invoke(RpcCall(true, "registerApplicationMaster", &requestProto, &responseProto));
+        return RegisterApplicationMasterResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    } catch (...) {
         THROW(YarnIOException,
               "Unexpected exception: when calling "
               "ApplicationMasterProtocol::registerApplicationMaster in %s: %d",
@@ -61,39 +63,43 @@ RegisterApplicationMasterResponse ApplicationMasterProtocol::registerApplication
 
 
 AllocateResponse ApplicationMasterProtocol::allocate(AllocateRequest &request) {
-	try {
-		AllocateRequestProto requestProto = request.getProto();
-		AllocateResponseProto responseProto;
-		invoke(RpcCall(true, "allocate", &requestProto, &responseProto));
-		return AllocateResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	}
-	catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationMasterProtocol::allocate in %s: %d",
-			  __FILE__, __LINE__);
-	}
+    try {
+        AllocateRequestProto requestProto = request.getProto();
+        AllocateResponseProto responseProto;
+        invoke(RpcCall(true, "allocate", &requestProto, &responseProto));
+        return AllocateResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+    catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationMasterProtocol::allocate in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 FinishApplicationMasterResponse ApplicationMasterProtocol::finishApplicationMaster(
-		FinishApplicationMasterRequest &request) {
-	try {
-		FinishApplicationMasterRequestProto requestProto = request.getProto();
-		FinishApplicationMasterResponseProto responseProto;
-		invoke(RpcCall(true, "finishApplicationMaster", &requestProto, &responseProto));
-		return FinishApplicationMasterResponse(responseProto);
-	} catch (const YarnRpcServerException & e) {
-		UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-		unwrapper.unwrap(__FILE__, __LINE__);
-	}	catch (...) {
-		THROW(YarnIOException,
-			  "Unexpected exception: when calling "
-			  "ApplicationMasterProtocol::finishApplicationMaster in %s: %d",
-			  __FILE__, __LINE__);
-	}
+        FinishApplicationMasterRequest &request) {
+    try {
+        FinishApplicationMasterRequestProto requestProto = request.getProto();
+        FinishApplicationMasterResponseProto responseProto;
+        invoke(RpcCall(true, "finishApplicationMaster", &requestProto, &responseProto));
+        return FinishApplicationMasterResponse(responseProto);
+    } catch (const YarnFailoverException & e) {
+         throw;
+    } catch (const YarnRpcServerException & e) {
+        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }	catch (...) {
+        THROW(YarnIOException,
+              "Unexpected exception: when calling "
+              "ApplicationMasterProtocol::finishApplicationMaster in %s: %d",
+              __FILE__, __LINE__);
+    }
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
index ca637fa..70c352e 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
+++ b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.h
@@ -46,7 +46,7 @@ namespace libyarn {
 
 class ApplicationMasterProtocol {
 public:
-	ApplicationMasterProtocol(std::string & schedHost, std::string & schedPort,
+	ApplicationMasterProtocol(const std::string & schedHost, const std::string & schedPort,
 			const std::string & tokenService, const SessionConfig & c,
 			const RpcAuth & a);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/413b6647/depends/libyarn/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/rpc/RpcChannel.cpp b/depends/libyarn/src/rpc/RpcChannel.cpp
index 00f45b1..a688d7b 100644
--- a/depends/libyarn/src/rpc/RpcChannel.cpp
+++ b/depends/libyarn/src/rpc/RpcChannel.cpp
@@ -711,12 +711,12 @@ static exception_ptr HandlerRpcResponseException(exception_ptr e) {
     try {
         rethrow_exception(e);
     } catch (const YarnRpcServerException & e) {
-        UnWrapper < NameNodeStandbyException, UnsupportedOperationException,
+        UnWrapper < ResourceManagerStandbyException, UnsupportedOperationException,
                   AccessControlException, SafeModeException, SaslException > unwrapper(e);
 
         try {
             unwrapper.unwrap(__FILE__, __LINE__);
-        } catch (const NameNodeStandbyException & e) {
+        } catch (const ResourceManagerStandbyException & e) {
             retval = current_exception();
         } catch (const UnsupportedOperationException & e) {
             retval = current_exception();