You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2015/10/29 13:59:30 UTC
incubator-hawq git commit: HAWQ-95. Add check after submit Hawq AM to
Yarn, if register failed, kill application from Yarn and retry
Repository: incubator-hawq
Updated Branches:
refs/heads/master d0dfbb16d -> 8239aec78
HAWQ-95. Add check after submit Hawq AM to Yarn, if register failed, kill application from Yarn and retry
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8239aec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8239aec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8239aec7
Branch: refs/heads/master
Commit: 8239aec784a2f2fee8904ce7021c2ec6363f1821
Parents: d0dfbb1
Author: Wen Lin <wl...@pivotal.io>
Authored: Thu Oct 29 21:04:31 2015 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Thu Oct 29 21:04:31 2015 +0800
----------------------------------------------------------------------
depends/libyarn/src/CMakeLists.txt | 2 +-
.../libyarn/src/libyarnclient/LibYarnClient.cpp | 54 ++++++++++++++
.../libyarn/src/libyarnclient/LibYarnClient.h | 2 +
.../src/libyarnclient/LibYarnClientC.cpp | 16 +++++
.../libyarn/src/libyarnclient/LibYarnClientC.h | 3 +
.../resourcebroker_LIBYARN_proc.c | 75 ++++++++++++++++----
6 files changed, 138 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index d159216..db5ccf5 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
SET(libyarn_VERSION_MAJOR 0)
SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 5)
+SET(libyarn_VERSION_PATCH 6)
SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
SET(libyarn_VERSION_API 1)
SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index eb5718c..aac5025 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -214,6 +214,60 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) {
}
}
+int LibYarnClient::forceKillJob(string &jobId) {
+
+#ifndef MOCKTEST
+ if ( keepRun ) {
+ keepRun=false;
+ void *thrc = NULL;
+ int rc = pthread_join(heartbeatThread, &thrc);
+ if ( rc != 0 ) {
+ LOG(INFO, "LibYarnClient::forceKillJob, fail to join heart-beat thread. "
+ "error code %d", rc);
+ return FR_FAILED;
+ }
+ }
+#endif
+
+ try{
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong, please check the jobId argument");
+ }
+
+ for (map<int,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) {
+ ostringstream key;
+ Container *container = it->second;
+ key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort();
+ Token nmToken = nmTokenCache[key.str()];
+ ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken);
+ LOG(INFO,"LibYarnClient::forceKillJob, container:%d is stopped",container->getId().getId());
+ }
+
+ ((ApplicationClient*) appClient)->forceKillApplication(clientAppId);
+ LOG(INFO, "LibYarnClient::forceKillJob, forceKillApplication");
+
+ for (map<int,Container*>::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) {
+ LOG(INFO,"LibYarnClient::forceKillJob, container:%d in jobIdContainers is deleted",it->second->getId().getId());
+ delete it->second;
+ it->second = NULL;
+ }
+ jobIdContainers.clear();
+ activeFailContainerIds.clear();
+ return FR_SUCCEEDED;
+ } catch(std::exception& e){
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::forceKillJob, catch the exception:" << e.what();
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ } catch (...) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::forceKillJob, catch unexpected exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ }
+}
+
+
void LibYarnClient::dummyAllocate() {
pthread_mutex_lock(&heartbeatLock);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClient.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h
index ce6a213..ea384fb 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h
@@ -39,6 +39,8 @@ namespace libyarn {
virtual int createJob(string &jobName, string &queue, string &jobId);
+ virtual int forceKillJob(string &jobId);
+
virtual void addResourceRequest(Resource capability, int32_t num_containers,
string host, int32_t priority, bool relax_locality);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
index a9aa29e..5db3685 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp
@@ -42,6 +42,10 @@ extern "C" {
return client->createJob(jobName, queue, jobId);
}
+ int forceKillJob(string &jobId) {
+ return client->forceKillJob(jobId);
+ }
+
int addContainerRequests(string &jobId,
Resource &capability,
int num_containers,
@@ -165,6 +169,17 @@ extern "C" {
}
}
+ int forceKillJob(LibYarnClient_t *client, char* jobId) {
+ string jobIdStr(jobId);
+ int result = client->forceKillJob(jobIdStr);
+ if (result == FUNCTION_SUCCEEDED) {
+ return FUNCTION_SUCCEEDED;
+ } else {
+ setErrorMessage(client->getErrorMessage());
+ return FUNCTION_FAILED;
+ }
+ }
+
int allocateResources(LibYarnClient_t *client, char *jobId,
int32_t priority, int32_t vCores, int32_t memory, int32_t num_containers,
char *blackListAdditions[], int blacklistAddsSize,
@@ -375,6 +390,7 @@ exit_err:
(*applicationReport)->status = applicationReportCpp.getYarnApplicationState();
(*applicationReport)->diagnostics = strdup(applicationReportCpp.getDiagnostics().c_str());
(*applicationReport)->startTime = applicationReportCpp.getStartTime();
+ (*applicationReport)->progress = applicationReportCpp.getProgress();
return FUNCTION_SUCCEEDED;
} else{
setErrorMessage(client->getErrorMessage());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/depends/libyarn/src/libyarnclient/LibYarnClientC.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.h b/depends/libyarn/src/libyarnclient/LibYarnClientC.h
index 6b02ec7..aac3b29 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClientC.h
+++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.h
@@ -50,6 +50,7 @@ typedef struct LibYarnApplicationReport_t {
enum YarnApplicationState status;
char *diagnostics;
int64_t startTime;
+ float progress;
}LibYarnApplicationReport_t;
typedef struct LibYarnContainerReport_t {
@@ -138,6 +139,8 @@ void deleteLibYarnClient(LibYarnClient_t *client);
int createJob(LibYarnClient_t *client, char *jobName, char *queue,char **jobId);
+int forceKillJob(LibYarnClient_t *client, char *jobId);
+
int addContainerRequest(LibYarnNodeInfo_t preferredHosts[], int preferredHostsSize,
int32_t priority, bool relax_locality);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8239aec7/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 1a44503..4c2cc64 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -1261,6 +1261,8 @@ int RB2YARN_initializeConnection(void)
return FUNCTION_SUCCEEDED;
}
+#define HAWQ_YARN_AM_HEARTBEAT_INTERVAL 5
+
/* Connect and disconnect to the global resource manager. */
int RB2YARN_connectToYARN(void)
{
@@ -1276,35 +1278,82 @@ int RB2YARN_connectToYARN(void)
YARNAMPort,
YARNTRKUrl,
&LIBYARNClient,
- 5*1000 /* Hard code 5 sec */);
+ HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000 /* Hard code 5 sec */);
return yarnres;
}
int RB2YARN_registerYARNApplication(void)
{
- int yarnres = FUNCTION_SUCCEEDED;
+ int retry = 5;
+ int yarnres = FUNCTION_SUCCEEDED, result = FUNCTION_SUCCEEDED;
+
yarnres = createJob(LIBYARNClient,
YARNAppName.Str,
YARNQueueName.Str,
&YARNJobID);
- if ( yarnres != FUNCTION_SUCCEEDED ) {
+ if ( yarnres != FUNCTION_SUCCEEDED )
+ {
elog(WARNING, "YARN mode resource broker failed to create application "
"in YARN resource manager. %s",
getErrorMessage());
+ return yarnres;
}
- else {
- elog(LOG, "YARN mode resource broker created job in YARN resource "
- "manager %s as new application %s assigned to queue %s.",
- YARNJobID,
- YARNAppName.Str,
- YARNQueueName.Str);
- ResBrokerStartTime = gettime_microsec();
+ elog(LOG, "YARN mode resource broker created job in YARN resource "
+ "manager %s as new application %s assigned to queue %s.",
+ YARNJobID,
+ YARNAppName.Str,
+ YARNQueueName.Str);
+
+ /* check if hawq is registered successfully in Hadoop Yarn.
+ * if not, kill application from Hadoop Yarn.
+ */
+ LibYarnApplicationReport_t *applicationReport = NULL;
+ while (retry > 0)
+ {
+ result = getApplicationReport(LIBYARNClient, YARNJobID, &applicationReport);
+ if (result != FUNCTION_SUCCEEDED || applicationReport == NULL)
+ {
+ if (retry > 0) {
+ retry--;
+ usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
+ continue;
+ } else {
+ elog(WARNING, "YARN mode resource broker failed to get application report, "
+ "so kill it from Hadoop Yarn.");
+ result = forceKillJob(LIBYARNClient, YARNJobID);
+ if (result != FUNCTION_SUCCEEDED)
+ elog(WARNING, "YARN mode resource broker kill job failed.");
+ return FUNCTION_FAILED;
+ }
+ }
- elog(LOG, "YARN mode resource broker registered new "
- "YARN application. Start time stamp "UINT64_FORMAT,
- ResBrokerStartTime);
+ if (applicationReport->progress < 0.5)
+ {
+ if (retry > 0) {
+ retry--;
+ usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
+ continue;
+ } else {
+ elog(WARNING, "YARN mode resource broker failed to register itself in Hadoop Yarn."
+ "Got progress:%f, and try to kill application from Hadoop Yarn",
+ applicationReport->progress);
+ result = forceKillJob(LIBYARNClient, YARNJobID);
+ if (result != FUNCTION_SUCCEEDED)
+ elog(WARNING, "YARN mode resource broker kill job failed.");
+ return FUNCTION_FAILED;
+ }
+ } else {
+ break;
+ }
}
+
+ ResBrokerStartTime = gettime_microsec();
+
+ elog(LOG, "YARN mode resource broker registered new "
+ "YARN application. Progress:%f, Start time stamp "UINT64_FORMAT,
+ applicationReport->progress, ResBrokerStartTime);
+
return yarnres;
}