You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/12/22 03:16:38 UTC
incubator-hawq git commit: HAWQ-267. Add check for AM heartbeat
thread status
Repository: incubator-hawq
Updated Branches:
refs/heads/master 72c63be31 -> b68130f32
HAWQ-267. Add check for AM heartbeat thread status
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/b68130f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/b68130f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/b68130f3
Branch: refs/heads/master
Commit: b68130f32fe69467a877b19bfa9fc9fafcf16acd
Parents: 72c63be
Author: Wen Lin <wl...@pivotal.io>
Authored: Tue Dec 22 10:24:53 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Tue Dec 22 10:24:53 2015 +0800
----------------------------------------------------------------------
depends/libyarn/src/CMakeLists.txt | 2 +-
.../libyarn/src/libyarnclient/LibYarnClient.cpp | 49 +++++++++++++++++---
.../libyarn/src/libyarnclient/LibYarnClient.h | 1 +
.../src/libyarnclient/LibYarnClientC.cpp | 2 +-
.../libyarnserver/ApplicationClientProtocol.cpp | 4 +-
.../resourcebroker_LIBYARN_proc.c | 3 +-
6 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index e9657cc..d19b3b2 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
SET(libyarn_VERSION_MAJOR 0)
SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 9)
+SET(libyarn_VERSION_PATCH 10)
SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
SET(libyarn_VERSION_API 1)
SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index 3fb7e5b..5208e05 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -42,7 +42,7 @@ LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort,
amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(amHost),
amPort(amPort), am_tracking_url(am_tracking_url),
heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""),
- keepRun(false){
+ keepRun(false), needHeartbeatAlive(false){
pthread_mutex_init( &(heartbeatLock), NULL );
amrmClient = NULL;
@@ -220,6 +220,7 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) {
"error code %d", rc);
throw std::runtime_error( "Fail to create heart-beat thread.");
}
+ needHeartbeatAlive = true;
#endif
LOG(INFO,"LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started");
@@ -272,6 +273,7 @@ int LibYarnClient::forceKillJob(string &jobId) {
throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
}
+ needHeartbeatAlive = false;
for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
ostringstream key;
Container *container = it->second;
@@ -484,6 +486,10 @@ int LibYarnClient::allocateResources(string &jobId,
throw std::invalid_argument("The jobId is wrong, check the jobId argument");
}
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
list<Container> allocatedContainerCache;
list<ContainerReport> preContainerReports;
@@ -622,6 +628,11 @@ int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[],
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
+
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
//1) asksBlank
list<ResourceRequest> asksBlank;
@@ -705,6 +716,10 @@ int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],in
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
LOG(INFO, "LibYarnClient::activeResources, activeResources started");
for (int i = 0; i < activeContainerSize; i++){
@@ -772,12 +787,13 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus)
}
#endif
- try{
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
- }
+ try{
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
+ }
+ needHeartbeatAlive = false;
+
//1. we should stop all containers related with this job
- //ContainerManagement cmgmt;
for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
ostringstream key;
Container *container = it->second;
@@ -820,6 +836,11 @@ int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applica
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
+
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
LOG(INFO,"LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]",
clientAppId.getClusterTimestamp(), clientAppId.getId());
applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId);
@@ -848,6 +869,11 @@ int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport> &cont
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
+
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
LOG(INFO,"LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]",
clientAppId.getClusterTimestamp(), clientAppId.getId());
containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
@@ -872,6 +898,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int
throw std::invalid_argument("The jobId is wrong,please check the jobId argument");
}
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
for (int i = 0; i < containerSize; i++) {
int64_t containerId = containerIds[i];
map<int64_t, Container*>::iterator it = jobIdContainers.find(containerId);
@@ -908,6 +938,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int
int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
bool includeChildQueues, bool recursive,QueueInfo &queueInfo) {
try{
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
+
queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue, includeApps,
includeChildQueues, recursive);
return FR_SUCCEEDED;
@@ -927,6 +961,9 @@ int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
int LibYarnClient::getClusterNodes(list<NodeState> &states,list<NodeReport> &nodeReports) {
try{
+ if (!keepRun && needHeartbeatAlive) {
+ throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
+ }
nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states);
return FR_SUCCEEDED;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h
index 9ce51be..24ccb6d 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h
@@ -142,6 +142,7 @@ namespace libyarn {
list<ResourceRequest> askRequests;
volatile bool keepRun;
+ bool needHeartbeatAlive;
#ifdef MOCKTEST
private:
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
index 51971fd..c558e31 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
@@ -263,7 +263,7 @@ extern "C" {
goto exit_err;
preferredAllocatedSize = allocatedPreferredList.size();
- preferredAllocatedArray = (LibYarnResource_t *)(sizeof(LibYarnResource_t) * preferredAllocatedSize);
+ preferredAllocatedArray = (LibYarnResource_t *)malloc(sizeof(LibYarnResource_t) * preferredAllocatedSize);
if(preferredAllocatedArray == NULL) {
setErrorMessage("LibYarnClientC::fail to allocate memory for resource array");
goto exit_err;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
index 9669152..ac0f026 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp
@@ -170,11 +170,11 @@ GetApplicationReportResponse ApplicationClientProtocol::getApplicationReport(
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
message GetContainersRequestProto {
- optional ApplicationIdProto application_id = 1;
+ optional ApplicationIdProto application_attempt_id = 1;
}
message GetContainersResponseProto {
- repeated ContainerReportProto containers_reports = 1;
+ repeated ContainerReportProto containers = 1;
}
*/
GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequest &request){
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b68130f3/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 814a578..d3028c3 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -1787,7 +1787,8 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size)
freeContainerStatusArray(ctnstatarr, ctnstatsize);
}
- freeContainerReportArray(ctnrparr, arrsize);
+ if(ctnrparr != NULL && arrsize > 0)
+ freeContainerReportArray(ctnrparr, arrsize);
return yarnres;
}