You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/10/29 13:59:30 UTC

incubator-hawq git commit: HAWQ-95. Add check after submit Hawq AM to Yarn, if register failed, kill application from Yarn and retry

Repository: incubator-hawq
Updated Branches:
  refs/heads/master d0dfbb16d -> 8239aec78


HAWQ-95. Add check after submit Hawq AM to Yarn, if register failed, kill application from Yarn and retry


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8239aec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8239aec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8239aec7

Branch: refs/heads/master
Commit: 8239aec784a2f2fee8904ce7021c2ec6363f1821
Parents: d0dfbb1
Author: Wen Lin <wl...@pivotal.io>
Authored: Thu Oct 29 21:04:31 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Thu Oct 29 21:04:31 2015 +0800

----------------------------------------------------------------------
 depends/libyarn/src/CMakeLists.txt              |  2 +-
 .../libyarn/src/libyarnclient/LibYarnClient.cpp | 54 ++++++++++++++
 .../libyarn/src/libyarnclient/LibYarnClient.h   |  2 +
 .../src/libyarnclient/LibYarnClientC.cpp        | 16 +++++
 .../libyarn/src/libyarnclient/LibYarnClientC.h  |  3 +
 .../resourcebroker_LIBYARN_proc.c               | 75 ++++++++++++++++----
 6 files changed, 138 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index d159216..db5ccf5 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
 SET(libyarn_VERSION_MAJOR 0)
 SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 5)
+SET(libyarn_VERSION_PATCH 6)
 SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
 SET(libyarn_VERSION_API 1)
 SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index eb5718c..aac5025 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -214,6 +214,60 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) {
     }
 }
 
+int LibYarnClient::forceKillJob(string &jobId) {
+
+#ifndef MOCKTEST
+    if ( keepRun ) {
+        keepRun=false;
+        void *thrc = NULL;
+        int rc = pthread_join(heartbeatThread, &thrc);
+        if ( rc != 0 ) {
+            LOG(INFO, "LibYarnClient::forceKillJob, fail to join heart-beat thread. "
+                      "error code %d", rc);
+            return FR_FAILED;
+        }
+    }
+#endif
+
+    try{
+        if (jobId != clientJobId) {
+            throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
+        }
+
+        for (map<int,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
+            ostringstream key;
+            Container *container = it->second;
+            key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
+            Token nmToken = nmTokenCache[key.str()];
+            ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken);
+            LOG(INFO,"LibYarnClient::forceKillJob, container:%d is stopped",container->getId().getId());
+        }
+
+        ((ApplicationClient*) appClient)->forceKillApplication(clientAppId);
+        LOG(INFO, "LibYarnClient::forceKillJob, forceKillApplication");
+
+        for (map<int,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) {
+            LOG(INFO,"LibYarnClient::forceKillJob, container:%d in jobIdContainers is deleted",it->second->getId().getId());
+            delete it->second;
+            it->second = NULL;
+        }
+        jobIdContainers.clear();
+        activeFailContainerIds.clear();
+        return FR_SUCCEEDED;
+    } catch(std::exception& e){
+        stringstream errorMsg;
+        errorMsg << "LibYarnClient::forceKillJob, catch the exception:" << e.what();
+        setErrorMessage(errorMsg.str());
+        return FR_FAILED;
+    } catch (...) {
+        stringstream errorMsg;
+        errorMsg << "LibYarnClient::forceKillJob, catch unexpected exception.";
+        setErrorMessage(errorMsg.str());
+        return FR_FAILED;
+    }
+}
+
+
 void LibYarnClient::dummyAllocate() {
 
     pthread_mutex_lock(&heartbeatLock);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h
index ce6a213..ea384fb 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h
@@ -39,6 +39,8 @@ namespace libyarn {
 
 		virtual int createJob(string &jobName, string &queue, string &jobId);
 
+		virtual int forceKillJob(string &jobId);
+
 		virtual void addResourceRequest(Resource capability, int32_t num_containers,
 				   string host, int32_t priority, bool relax_locality);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
index a9aa29e..5db3685 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
@@ -42,6 +42,10 @@ extern "C" {
 				return client->createJob(jobName, queue, jobId);
 			}
 
+			int forceKillJob(string &jobId) {
+				return client->forceKillJob(jobId);
+			}
+
 			int addContainerRequests(string &jobId,
 									Resource &capability,
 									int num_containers,
@@ -165,6 +169,17 @@ extern "C" {
 		}
 	}
 
+	int forceKillJob(LibYarnClient_t *client, char* jobId) {
+		string jobIdStr(jobId);
+		int result = client->forceKillJob(jobIdStr);
+		if (result == FUNCTION_SUCCEEDED) {
+			return FUNCTION_SUCCEEDED;
+		} else {
+			setErrorMessage(client->getErrorMessage());
+			return FUNCTION_FAILED;
+		}
+	}
+
 	int allocateResources(LibYarnClient_t *client, char *jobId,
 						int32_t priority, int32_t vCores, int32_t memory, int32_t num_containers,
 						char *blackListAdditions[], int blacklistAddsSize,
@@ -375,6 +390,7 @@ exit_err:
 			(*applicationReport)->status = applicationReportCpp.getYarnApplicationState();
 			(*applicationReport)->diagnostics = strdup(applicationReportCpp.getDiagnostics().c_str());
 			(*applicationReport)->startTime = applicationReportCpp.getStartTime();
+			(*applicationReport)->progress = applicationReportCpp.getProgress();
 			return FUNCTION_SUCCEEDED;
 		} else{
 			setErrorMessage(client->getErrorMessage());

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClientC.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.h b/depends/libyarn/src/libyarnclient/LibYarnClientC.h
index 6b02ec7..aac3b29 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.h
@@ -50,6 +50,7 @@ typedef struct LibYarnApplicationReport_t {
 	enum YarnApplicationState status;
 	char *diagnostics;
 	int64_t startTime;
+	float progress;
 }LibYarnApplicationReport_t;
 
 typedef struct LibYarnContainerReport_t {
@@ -138,6 +139,8 @@ void deleteLibYarnClient(LibYarnClient_t *client);
 
 int createJob(LibYarnClient_t *client, char *jobName, char *queue,char **jobId);
 
+int forceKillJob(LibYarnClient_t *client, char *jobId);
+
 int addContainerRequest(LibYarnNodeInfo_t preferredHosts[], int preferredHostsSize,
 						int32_t priority, bool relax_locality);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 1a44503..4c2cc64 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -1261,6 +1261,8 @@ int RB2YARN_initializeConnection(void)
 	return FUNCTION_SUCCEEDED;
 }
 
+#define HAWQ_YARN_AM_HEARTBEAT_INTERVAL 5
+
 /* Connect and disconnect to the global resource manager. */
 int RB2YARN_connectToYARN(void)
 {
@@ -1276,35 +1278,82 @@ int RB2YARN_connectToYARN(void)
 						       YARNAMPort,
 						       YARNTRKUrl,
 						       &LIBYARNClient,
-						       5*1000 /* Hard code 5 sec */);
+						       HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000 /* Hard code 5 sec */);
     return yarnres;
 }
 
 int RB2YARN_registerYARNApplication(void)
 {
-	int yarnres = FUNCTION_SUCCEEDED;
+	int retry = 5;
+	int yarnres = FUNCTION_SUCCEEDED, result = FUNCTION_SUCCEEDED;
+
 	yarnres = createJob(LIBYARNClient,
 					    YARNAppName.Str,
 						YARNQueueName.Str,
 						&YARNJobID);
-	if ( yarnres != FUNCTION_SUCCEEDED ) {
+	if ( yarnres != FUNCTION_SUCCEEDED )
+	{
 		elog(WARNING, "YARN mode resource broker failed to create application "
 					  "in YARN resource manager. %s",
 					  getErrorMessage());
+		return yarnres;
 	}
-	else {
-		elog(LOG, "YARN mode resource broker created job in YARN resource "
-				  "manager %s as new application %s assigned to queue %s.",
-				  YARNJobID,
-				  YARNAppName.Str,
-                  YARNQueueName.Str);
 
-		ResBrokerStartTime = gettime_microsec();
+	elog(LOG, "YARN mode resource broker created job in YARN resource "
+			  "manager %s as new application %s assigned to queue %s.",
+			  YARNJobID,
+			  YARNAppName.Str,
+			  YARNQueueName.Str);
+
+	/* check if hawq is registered successfully in Hadoop Yarn.
+	 * if not, kill application from Hadoop Yarn.
+	 */
+	LibYarnApplicationReport_t *applicationReport = NULL;
+	while (retry > 0)
+	{
+		result = getApplicationReport(LIBYARNClient, YARNJobID, &applicationReport);
+		if (result != FUNCTION_SUCCEEDED || applicationReport == NULL)
+		{
+			if (retry > 0) {
+				retry--;
+				usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
+				continue;
+			} else {
+				elog(WARNING, "YARN mode resource broker failed to get application report, "
+							  "so kill it from Hadoop Yarn.");
+				result = forceKillJob(LIBYARNClient, YARNJobID);
+				if (result != FUNCTION_SUCCEEDED)
+					elog(WARNING, "YARN mode resource broker kill job failed.");
+				return FUNCTION_FAILED;
+			}
+		}
 
-		elog(LOG, "YARN mode resource broker registered new "
-				  "YARN application. Start time stamp "UINT64_FORMAT,
-				  ResBrokerStartTime);
+		if (applicationReport->progress < 0.5)
+		{
+			if (retry > 0) {
+				retry--;
+				usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
+				continue;
+			} else {
+				elog(WARNING, "YARN mode resource broker failed to register itself in Hadoop Yarn."
+							  "Got progress:%f, and try to kill application from Hadoop Yarn",
+							  applicationReport->progress);
+				result = forceKillJob(LIBYARNClient, YARNJobID);
+				if (result != FUNCTION_SUCCEEDED)
+					elog(WARNING, "YARN mode resource broker kill job failed.");
+				return FUNCTION_FAILED;
+			}
+		} else {
+			break;
+		}
 	}
+
+	ResBrokerStartTime = gettime_microsec();
+
+	elog(LOG, "YARN mode resource broker registered new "
+			  "YARN application. Progress:%f, Start time stamp "UINT64_FORMAT,
+			  applicationReport->progress, ResBrokerStartTime);
+
 	return yarnres;
 }