You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/29 04:10:37 UTC
incubator-hawq git commit: HAWQ-104. HAWQ RM resource request timeout
mechanism does not work
Repository: incubator-hawq
Updated Branches:
refs/heads/master d3242b458 -> 77a0f47a5
HAWQ-104. HAWQ RM resource request timeout mechanism does not work
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/77a0f47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/77a0f47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/77a0f47a
Branch: refs/heads/master
Commit: 77a0f47a5beca62a450cc67992eaa0acde270a37
Parents: d3242b4
Author: Yi Jin <yj...@pivotal.io>
Authored: Thu Oct 29 11:09:55 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Thu Oct 29 11:09:55 2015 +0800
----------------------------------------------------------------------
.../communication/rmcomm_QD2RM.c | 24 ++++------
.../communication/rmcomm_RM2RMSEG.c | 2 +-
.../resourcemanager/include/resqueuemanager.h | 4 +-
.../resourcebroker/resourcebroker_LIBYARN.c | 5 +-
.../resourcebroker_LIBYARN_proc.c | 13 +++--
.../resourcebroker/resourcebroker_NONE.c | 5 +-
src/backend/resourcemanager/resourcemanager.c | 2 +
src/backend/resourcemanager/resourcepool.c | 3 +-
src/backend/resourcemanager/resqueuemanager.c | 50 +++++++++++++++-----
9 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 0cab819..7881e9f 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -426,8 +426,7 @@ int registerConnectionInRMByStr(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to register in HAWQ resource manager because of "
- "RPC error %s.",
+ "failed to register in HAWQ resource manager because of %s.",
getErrorCodeExplain(res));
return res;
}
@@ -440,8 +439,7 @@ int registerConnectionInRMByStr(int index,
if ( response->Result != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to register in HAWQ resource manager because of remote"
- "error %s.",
+ "failed to register in HAWQ resource manager because of %s.",
getErrorCodeExplain(response->Result));
return response->Result;
}
@@ -476,8 +474,7 @@ int registerConnectionInRMByOID(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to register in HAWQ resource manager because of "
- "RPC error %s.",
+ "failed to register in HAWQ resource manager because of %s.",
getErrorCodeExplain(res));
return res;
}
@@ -490,8 +487,7 @@ int registerConnectionInRMByOID(int index,
if ( response->Result != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to register in HAWQ resource manager because of remote"
- "error %s.",
+ "failed to register in HAWQ resource manager because of %s.",
getErrorCodeExplain(response->Result));
return response->Result;
}
@@ -526,8 +522,7 @@ int unregisterConnectionInRM(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to unregister in HAWQ resource manager because of "
- "RPC error %s.",
+ "failed to unregister in HAWQ resource manager because of %s.",
getErrorCodeExplain(res));
return res;
}
@@ -539,8 +534,7 @@ int unregisterConnectionInRM(int index,
{
res = response->Result;
snprintf(errorbuf, errorbufsize,
- "failed to unregister in HAWQ resource manager because of "
- "remote error %s.",
+ "failed to unregister in HAWQ resource manager because of %s.",
getErrorCodeExplain(response->Result));
}
@@ -788,8 +782,7 @@ int returnResource(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "failed to return resource to HAWQ resource manager because of "
- "RPC error %s.",
+ "failed to return resource to HAWQ resource manager because of %s.",
getErrorCodeExplain(res));
return res;
}
@@ -801,8 +794,7 @@ int returnResource(int index,
{
res = response->Result;
snprintf(errorbuf, errorbufsize,
- "Fail to return resource to HAWQ resource manager because of "
- "remote error %s.",
+ "failed to return resource to HAWQ resource manager because of %s.",
getErrorCodeExplain(res));
return res;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index c87694a..d3b1d61 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -650,7 +650,7 @@ void processContainersAfterIncreaseMemoryQuota(GRMContainerSet ctns, bool accept
/* Add container to KickedContainers if lifetime is long enough */
else
{
- removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core);
+ removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core, false);
addGRMContainerToKicked(ctn);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 38de157..5a238a1 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -393,7 +393,9 @@ int minusResourceFromReourceManager(int32_t memorymb, double core);
int addNewResourceToResourceManagerByBundle(ResourceBundle bundle);
int minusResourceFromResourceManagerByBundle(ResourceBundle bundle);
-void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core);
+void removePendingResourceRequestInRootQueue(int32_t memorymb,
+ uint32_t core,
+ bool updatependingtime);
void clearPendingResourceRequestInRootQueue(void);
void buildAcquireResourceResponseMessage(ConnectionTrack conn);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
index 64c3488..98ae6a6 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
@@ -321,6 +321,8 @@ int RB_LIBYARN_acquireResource(uint32_t memorymb, uint32_t core, List *preferred
res = RESBROK_PIPE_ERROR;
}
+ elog(DEBUG3, "LIBYARN mode resource broker wrote %d bytes out.", sendBuffer.Cursor+1);
+
destroySelfMaintainBuffer(&sendBuffer);
elog(LOG, "YARN mode resource broker wrote resource allocation request to "
"resource broker process.");
@@ -830,7 +832,8 @@ int handleRB2RM_AllocatedResource(void)
*/
removePendingResourceRequestInRootQueue(
response.MemoryMB * (response.ExpectedContainerCount - acceptedcount),
- response.Core * (response.ExpectedContainerCount - acceptedcount));
+ response.Core * (response.ExpectedContainerCount - acceptedcount),
+ response.Result == FUNC_RETURN_OK);
elog(LOG, "Accepted (%d MB, %d CORE) x %d from resource broker, "
"Expected %d containers, skipped %d containers.",
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 8f757d1..d4a5748 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -770,11 +770,6 @@ int handleRM2RB_AllocateResource(void)
request.Core,
request.ContainerCount);
- if ( YARNJobID == NULL )
- {
- return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
- }
-
/* build preferred host list */
if (request.MsgLength > 0 && request.PreferredSize > 0) {
@@ -808,6 +803,14 @@ int handleRM2RB_AllocateResource(void)
}
}
+ elog(DEBUG3, "LIBYARN mode resource broker process read %d bytes in.",
+ request.MsgLength + sizeof(RPCRequestRBAllocateResourceContainersData));
+
+ if ( YARNJobID == NULL )
+ {
+ return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
+ }
+
/*
* Mark latest used YARN container resource quota for testing if YARN
* resource queue is busy.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
index 941472e..8eaa7ba 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
@@ -202,9 +202,10 @@ int RB_NONE_acquireResource(uint32_t memorymb, uint32_t core, List *preferred)
/* Clean up pending resource quantity. */
removePendingResourceRequestInRootQueue( contmemorymb * (contcount - contactcount),
- 1 * (contcount - contactcount));
+ 1 * (contcount - contactcount),
+ res == FUNC_RETURN_OK);
- return FUNC_RETURN_OK;
+ return res;
}
int RB_NONE_returnResource(List **ctnl)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 5577e83..742c665 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -2137,6 +2137,8 @@ int generateAllocRequestToBroker(void)
if ( mctrack->TotalPendingStartTime == 0 )
{
mctrack->TotalPendingStartTime = gettime_microsec();
+ elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+ mctrack->TotalPendingStartTime);
}
res = RB_acquireResource(reqmem, reqcore, preferred);
if ( res != FUNC_RETURN_OK && res != RESBROK_PIPE_BUSY )
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 3fd4dce..f3d891f 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -2746,7 +2746,7 @@ void moveAllAcceptedGRMContainersToResPool(void)
elog(LOG, "AddPendingContainerCount minused 1, current value %d",
PRESPOOL->AddPendingContainerCount);
addNewResourceToResourceManager(ctn->MemoryMB, ctn->Core);
- removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core);
+ removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core, true);
}
validateResourcePoolStatus(true);
}
@@ -3237,7 +3237,6 @@ void dropAllResPoolGRMContainersToToBeKicked(void)
for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
{
resetResourceBundleData(&(PQUEMGR->RatioTrackers[i]->TotalPending), 0, 0, -1);
- PQUEMGR->RatioTrackers[i]->TotalPendingStartTime = 0;
}
refreshMemoryCoreRatioLevelUsage(gettime_microsec());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 6b7a3ad..75dcece 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -3801,7 +3801,9 @@ void returnAllocatedResourceToLeafQueue(DynResourceQueueTrack track,
memorymb, core);
}
-void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core)
+void removePendingResourceRequestInRootQueue(int32_t memorymb,
+ uint32_t core,
+ bool updatependingtime)
{
if ( memorymb ==0 && core == 0 )
return;
@@ -3823,13 +3825,21 @@ void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core)
Assert(PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB >= 0 &&
PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core >= 0);
- if ( PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB == 0 &&
- PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core == 0 )
+ if ( updatependingtime )
{
- PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = 0;
- }
- else if ( memorymb > 0 && core > 0 ){
- PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = gettime_microsec();
+ if ( PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB == 0 &&
+ PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core == 0 )
+ {
+ PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = 0;
+ elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+ PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
+ }
+ else if ( memorymb > 0 && core > 0 )
+ {
+ PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = gettime_microsec();
+ elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+ PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
+ }
}
elog(LOG, "Removed pending GRM request from root resource queue by "
@@ -3849,7 +3859,8 @@ void clearPendingResourceRequestInRootQueue(void)
{
removePendingResourceRequestInRootQueue(
PQUEMGR->RatioTrackers[i]->TotalPending.MemoryMB,
- PQUEMGR->RatioTrackers[i]->TotalPending.Core);
+ PQUEMGR->RatioTrackers[i]->TotalPending.Core,
+ true);
}
}
}
@@ -4421,6 +4432,7 @@ void timeoutQueuedRequest(void)
"available cluster.");
/* Build timeout response. */
buildTimeoutResponseForQueuedRequest(ct, RESQUEMGR_NOCLUSTER_TIMEOUT);
+ transformConnectionTrackProgress(ct, CONN_PP_TIMEOUT_FAIL);
}
else
{
@@ -4449,7 +4461,6 @@ void timeoutQueuedRequest(void)
if ( curcon->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
{
- elog(DEBUG3, "Check waiting connection track now.");
/*
* Check if corresponding mem core ratio tracker has long enough
* time to waiting for GRM containers.
@@ -4463,6 +4474,24 @@ void timeoutQueuedRequest(void)
*/
Assert(PQUEMGR->RootTrack != NULL);
+ /* Check if this is a head request in the queue. */
+ if ( queuetrack->QueryResRequests.NodeCount > 0 )
+ {
+ ConnectionTrack topwaiter = getDQueueHeadNodeData(&(queuetrack->QueryResRequests));
+ if ( topwaiter == curcon && topwaiter->HeadQueueTime == 0 )
+ {
+ topwaiter->HeadQueueTime = gettime_microsec();
+ elog(DEBUG3, "Set timestamp of waiting at head of queue.");
+ }
+ }
+
+ elog(DEBUG3, "Check waiting connection track: ConnID %d "
+ "Head time "UINT64_FORMAT " "
+ "Global resource pending time "UINT64_FORMAT " ",
+ curcon->ConnID,
+ curmsec - curcon->HeadQueueTime,
+ curmsec - PQUEMGR->RatioTrackers[index]->TotalPendingStartTime);
+
bool tocancel = false;
if ( ( (PQUEMGR->RootTrack->ClusterSegNumberMax == 0) &&
@@ -4514,7 +4543,7 @@ void timeoutQueuedRequest(void)
if ( tocancel )
{
cancelResourceAllocRequest(curcon);
- returnConnectionToQueue(curcon, false);
+ returnConnectionToQueue(curcon, true);
}
}
}
@@ -4534,7 +4563,6 @@ void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack, uint32_t re
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE);
- transformConnectionTrackProgress(conntrack, CONN_PP_TIMEOUT_FAIL);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);