You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/12/22 03:16:38 UTC

incubator-hawq git commit: HAWQ-267. Add check for AM heartbeat thread status

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 72c63be31 -> b68130f32


HAWQ-267. Add check for AM heartbeat thread status


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/b68130f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/b68130f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/b68130f3

Branch: refs/heads/master
Commit: b68130f32fe69467a877b19bfa9fc9fafcf16acd
Parents: 72c63be
Author: Wen Lin <wl...@pivotal.io>
Authored: Tue Dec 22 10:24:53 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Tue Dec 22 10:24:53 2015 +0800

----------------------------------------------------------------------
 depends/libyarn/src/CMakeLists.txt              |  2 +-
 .../libyarn/src/libyarnclient/LibYarnClient.cpp | 49 +++++++++++++++++---
 .../libyarn/src/libyarnclient/LibYarnClient.h   |  1 +
 .../src/libyarnclient/LibYarnClientC.cpp        |  2 +-
 .../libyarnserver/ApplicationClientProtocol.cpp |  4 +-
 .../resourcebroker_LIBYARN_proc.c               |  3 +-
 6 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index e9657cc..d19b3b2 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
 SET(libyarn_VERSION_MAJOR 0)
 SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 9)
+SET(libyarn_VERSION_PATCH 10)
 SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
 SET(libyarn_VERSION_API 1)
 SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index 3fb7e5b..5208e05 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -42,7 +42,7 @@ LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort,
 		amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(amHost),
 		amPort(amPort), am_tracking_url(am_tracking_url),
 		heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""),
-		keepRun(false){
+		keepRun(false), needHeartbeatAlive(false){
         pthread_mutex_init( &(heartbeatLock), NULL );
 
 		amrmClient = NULL;
@@ -220,6 +220,7 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) {
                       "error code %d", rc);
             throw std::runtime_error( "Fail to create heart-beat thread.");
         }
+        needHeartbeatAlive = true;
 #endif
 
         LOG(INFO,"LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started");
@@ -272,6 +273,7 @@ int LibYarnClient::forceKillJob(string &jobId) {
             throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
         }
 
+        needHeartbeatAlive = false;
         for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
             ostringstream key;
             Container *container = it->second;
@@ -484,6 +486,10 @@ int LibYarnClient::allocateResources(string &jobId,
 			throw std::invalid_argument("The jobId is wrong, check the jobId argument");
 		}
 
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
         ApplicationMaster*    amrmClientAlias = (ApplicationMaster*) amrmClient;
         list<Container>       allocatedContainerCache;
         list<ContainerReport> preContainerReports;
@@ -622,6 +628,11 @@ int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[],
 		if (jobId != clientJobId) {
 			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
 		}
+
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
         ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
         //1) asksBlank
         list<ResourceRequest> asksBlank;
@@ -705,6 +716,10 @@ int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],in
 		if (jobId != clientJobId) {
 			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
 		}
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
 	    LOG(INFO, "LibYarnClient::activeResources, activeResources started");
 
         for (int i = 0; i < activeContainerSize; i++){
@@ -772,12 +787,13 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus)
     }
 #endif
 
-	try{
-		if (jobId != clientJobId) {
-			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
-		}
+    try{
+        if (jobId != clientJobId) {
+            throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
+        }
+        needHeartbeatAlive = false;
+
         //1. we should stop all containers related with this job
-        //ContainerManagement cmgmt;
         for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
             ostringstream key;
             Container *container = it->second;
@@ -820,6 +836,11 @@ int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applica
 		if (jobId != clientJobId) {
 			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
 		}
+
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
 		LOG(INFO,"LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]",
 				clientAppId.getClusterTimestamp(), clientAppId.getId());
 		applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId);
@@ -848,6 +869,11 @@ int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &cont
 		if (jobId != clientJobId) {
 			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
 		}
+
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
 		LOG(INFO,"LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]",
 				clientAppId.getClusterTimestamp(), clientAppId.getId());
 		containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
@@ -872,6 +898,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int
 			throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
 		}
 
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
 		for (int i = 0; i < containerSize; i++) {
 			int64_t containerId = containerIds[i];
 			map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
@@ -908,6 +938,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int
 int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
 		bool includeChildQueues, bool recursive,QueueInfo &queueInfo) {
     try{
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
+
         queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue, includeApps,
                 includeChildQueues, recursive);
         return FR_SUCCEEDED;
@@ -927,6 +961,9 @@ int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
 
 int LibYarnClient::getClusterNodes(list<NodeState> &states,list<NodeReport> &nodeReports) {
     try{
+        if (!keepRun && needHeartbeatAlive) {
+            throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+        }
         nodeReports =  ((ApplicationClient*) appClient)->getClusterNodes(states);
         return FR_SUCCEEDED;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h
index 9ce51be..24ccb6d 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h
@@ -142,6 +142,7 @@ namespace libyarn {
 		list<ResourceRequest> askRequests;
 
 		volatile bool keepRun;
+		bool needHeartbeatAlive;
 #ifdef MOCKTEST
 	private:
     /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
index 51971fd..c558e31 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
@@ -263,7 +263,7 @@ extern "C" {
 			goto exit_err;
 
 		preferredAllocatedSize = allocatedPreferredList.size();
-		preferredAllocatedArray = (LibYarnResource_t *)(sizeof(LibYarnResource_t) * preferredAllocatedSize);
+		preferredAllocatedArray = (LibYarnResource_t *)malloc(sizeof(LibYarnResource_t) * preferredAllocatedSize);
 		if(preferredAllocatedArray == NULL) {
 			setErrorMessage("LibYarnClientC::fail to allocate memory for resource array");
 			goto exit_err;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
index 9669152..ac0f026 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
@@ -170,11 +170,11 @@ GetApplicationReportResponse ApplicationClientProtocol::getApplicationReport(
  rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
 
 message GetContainersRequestProto {
-  optional ApplicationIdProto application_id = 1;
+  optional ApplicationIdProto application_attempt_id = 1;
 }
 
 message GetContainersResponseProto {
-  repeated ContainerReportProto containers_reports = 1;
+  repeated ContainerReportProto containers = 1;
 }
  */
 GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequest &request){

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 814a578..d3028c3 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -1787,7 +1787,8 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size)
 		freeContainerStatusArray(ctnstatarr, ctnstatsize);
 
 	}
-	freeContainerReportArray(ctnrparr, arrsize);
+	if(ctnrparr != NULL && arrsize > 0)
+		freeContainerReportArray(ctnrparr, arrsize);
 	return yarnres;
 }