You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/29 04:10:37 UTC

incubator-hawq git commit: HAWQ-104. HAWQ RM resource request timeout mechanism does not work

Repository: incubator-hawq
Updated Branches:
  refs/heads/master d3242b458 -> 77a0f47a5


HAWQ-104. HAWQ RM resource request timeout mechanism does not work


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/77a0f47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/77a0f47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/77a0f47a

Branch: refs/heads/master
Commit: 77a0f47a5beca62a450cc67992eaa0acde270a37
Parents: d3242b4
Author: Yi Jin <yj...@pivotal.io>
Authored: Thu Oct 29 11:09:55 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Thu Oct 29 11:09:55 2015 +0800

----------------------------------------------------------------------
 .../communication/rmcomm_QD2RM.c                | 24 ++++------
 .../communication/rmcomm_RM2RMSEG.c             |  2 +-
 .../resourcemanager/include/resqueuemanager.h   |  4 +-
 .../resourcebroker/resourcebroker_LIBYARN.c     |  5 +-
 .../resourcebroker_LIBYARN_proc.c               | 13 +++--
 .../resourcebroker/resourcebroker_NONE.c        |  5 +-
 src/backend/resourcemanager/resourcemanager.c   |  2 +
 src/backend/resourcemanager/resourcepool.c      |  3 +-
 src/backend/resourcemanager/resqueuemanager.c   | 50 +++++++++++++++-----
 9 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 0cab819..7881e9f 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -426,8 +426,7 @@ int registerConnectionInRMByStr(int 		   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to register in HAWQ resource manager because of "
-    			 "RPC error %s.",
+    			 "failed to register in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(res));
     	return res;
     }
@@ -440,8 +439,7 @@ int registerConnectionInRMByStr(int 		   index,
     if ( response->Result != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to register in HAWQ resource manager because of remote"
-    			 "error %s.",
+    			 "failed to register in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(response->Result));
     	return response->Result;
     }
@@ -476,8 +474,7 @@ int registerConnectionInRMByOID(int 		   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to register in HAWQ resource manager because of "
-    			 "RPC error %s.",
+    			 "failed to register in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(res));
     	return res;
     }
@@ -490,8 +487,7 @@ int registerConnectionInRMByOID(int 		   index,
     if ( response->Result != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to register in HAWQ resource manager because of remote"
-    			 "error %s.",
+    			 "failed to register in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(response->Result));
     	return response->Result;
     }
@@ -526,8 +522,7 @@ int	unregisterConnectionInRM(int 			   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to unregister in HAWQ resource manager because of "
-    			 "RPC error %s.",
+    			 "failed to unregister in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(res));
     	return res;
     }
@@ -539,8 +534,7 @@ int	unregisterConnectionInRM(int 			   index,
     {
     	res = response->Result;
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to unregister in HAWQ resource manager because of "
-    			 "remote error %s.",
+    			 "failed to unregister in HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(response->Result));
     }
 
@@ -788,8 +782,7 @@ int returnResource(int 		index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "failed to return resource to HAWQ resource manager because of "
-    			 "RPC error %s.",
+    			 "failed to return resource to HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(res));
     	return res;
     }
@@ -801,8 +794,7 @@ int returnResource(int 		index,
     {
     	res = response->Result;
         snprintf(errorbuf, errorbufsize,
-        		 "Fail to return resource to HAWQ resource manager because of "
-        		 "remote error %s.",
+        		 "failed to return resource to HAWQ resource manager because of %s.",
 				 getErrorCodeExplain(res));
         return res;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index c87694a..d3b1d61 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -650,7 +650,7 @@ void processContainersAfterIncreaseMemoryQuota(GRMContainerSet ctns, bool accept
     		/* Add container to KickedContainers if lifetime is long enough */
     		else
     		{
-    			removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core);
+    			removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core, false);
     			addGRMContainerToKicked(ctn);
     		}
     	}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 38de157..5a238a1 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -393,7 +393,9 @@ int minusResourceFromReourceManager(int32_t memorymb, double core);
 int addNewResourceToResourceManagerByBundle(ResourceBundle bundle);
 int minusResourceFromResourceManagerByBundle(ResourceBundle bundle);
 
-void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core);
+void removePendingResourceRequestInRootQueue(int32_t 	memorymb,
+											 uint32_t 	core,
+											 bool 		updatependingtime);
 void clearPendingResourceRequestInRootQueue(void);
 void buildAcquireResourceResponseMessage(ConnectionTrack conn);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
index 64c3488..98ae6a6 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
@@ -321,6 +321,8 @@ int RB_LIBYARN_acquireResource(uint32_t memorymb, uint32_t core, List *preferred
 		res = RESBROK_PIPE_ERROR;
 	}
 
+	elog(DEBUG3, "LIBYARN mode resource broker wrote %d bytes out.", sendBuffer.Cursor+1);
+
 	destroySelfMaintainBuffer(&sendBuffer);
 	elog(LOG, "YARN mode resource broker wrote resource allocation request to "
 			  "resource broker process.");
@@ -830,7 +832,8 @@ int handleRB2RM_AllocatedResource(void)
 	 */
 	removePendingResourceRequestInRootQueue(
 		response.MemoryMB * (response.ExpectedContainerCount - acceptedcount),
-		response.Core     * (response.ExpectedContainerCount - acceptedcount));
+		response.Core     * (response.ExpectedContainerCount - acceptedcount),
+		response.Result == FUNC_RETURN_OK);
 
 	elog(LOG, "Accepted (%d MB, %d CORE) x %d from resource broker, "
 			  "Expected %d containers, skipped %d containers.",

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 8f757d1..d4a5748 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -770,11 +770,6 @@ int handleRM2RB_AllocateResource(void)
 				 request.Core,
 				 request.ContainerCount);
 
-	if ( YARNJobID == NULL )
-	{
-		return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
-	}
-
 	/* build preferred host list */
 	if (request.MsgLength > 0 && request.PreferredSize > 0) {
 
@@ -808,6 +803,14 @@ int handleRM2RB_AllocateResource(void)
 		}
 	}
 
+	elog(DEBUG3, "LIBYARN mode resource broker process read %d bytes in.",
+				 request.MsgLength + sizeof(RPCRequestRBAllocateResourceContainersData));
+
+	if ( YARNJobID == NULL )
+	{
+		return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
+	}
+
 	/*
 	 * Mark latest used YARN container resource quota for testing if YARN
 	 * resource queue is busy.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
index 941472e..8eaa7ba 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_NONE.c
@@ -202,9 +202,10 @@ int RB_NONE_acquireResource(uint32_t memorymb, uint32_t core, List *preferred)
 
 	/* Clean up pending resource quantity. */
 	removePendingResourceRequestInRootQueue( contmemorymb * (contcount - contactcount),
-											 1            * (contcount - contactcount));
+											 1            * (contcount - contactcount),
+											 res == FUNC_RETURN_OK);
 
-	return FUNC_RETURN_OK;
+	return res;
 }
 
 int RB_NONE_returnResource(List **ctnl)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 5577e83..742c665 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -2137,6 +2137,8 @@ int generateAllocRequestToBroker(void)
 		if ( mctrack->TotalPendingStartTime == 0 )
 		{
 			mctrack->TotalPendingStartTime = gettime_microsec();
+			elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+						 mctrack->TotalPendingStartTime);
 		}
 		res = RB_acquireResource(reqmem, reqcore, preferred);
 		if ( res != FUNC_RETURN_OK && res != RESBROK_PIPE_BUSY )

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 3fd4dce..f3d891f 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -2746,7 +2746,7 @@ void moveAllAcceptedGRMContainersToResPool(void)
 		elog(LOG, "AddPendingContainerCount minused 1, current value %d",
 				  PRESPOOL->AddPendingContainerCount);
 		addNewResourceToResourceManager(ctn->MemoryMB, ctn->Core);
-		removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core);
+		removePendingResourceRequestInRootQueue(ctn->MemoryMB, ctn->Core, true);
 	}
 	validateResourcePoolStatus(true);
 }
@@ -3237,7 +3237,6 @@ void dropAllResPoolGRMContainersToToBeKicked(void)
 	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
 	{
 		resetResourceBundleData(&(PQUEMGR->RatioTrackers[i]->TotalPending), 0, 0, -1);
-		PQUEMGR->RatioTrackers[i]->TotalPendingStartTime = 0;
 	}
 
 	refreshMemoryCoreRatioLevelUsage(gettime_microsec());

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77a0f47a/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 6b7a3ad..75dcece 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -3801,7 +3801,9 @@ void returnAllocatedResourceToLeafQueue(DynResourceQueueTrack track,
 			  memorymb, core);
 }
 
-void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core)
+void removePendingResourceRequestInRootQueue(int32_t 	memorymb,
+											 uint32_t 	core,
+											 bool 		updatependingtime)
 {
 	if ( memorymb ==0 && core == 0 )
 		return;
@@ -3823,13 +3825,21 @@ void removePendingResourceRequestInRootQueue(int32_t memorymb, uint32_t core)
 	Assert(PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB >= 0 &&
 		   PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core >= 0);
 
-	if ( PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB == 0 &&
-		 PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core == 0 )
+	if ( updatependingtime )
 	{
-		PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = 0;
-	}
-	else if ( memorymb > 0 && core > 0 ){
-		PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = gettime_microsec();
+		if ( PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB == 0 &&
+			 PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core == 0 )
+		{
+			PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = 0;
+			elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+						 PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
+		}
+		else if ( memorymb > 0 && core > 0 )
+		{
+			PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = gettime_microsec();
+			elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
+						 PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
+		}
 	}
 
 	elog(LOG, "Removed pending GRM request from root resource queue by "
@@ -3849,7 +3859,8 @@ void clearPendingResourceRequestInRootQueue(void)
 		{
 			removePendingResourceRequestInRootQueue(
 					PQUEMGR->RatioTrackers[i]->TotalPending.MemoryMB,
-					PQUEMGR->RatioTrackers[i]->TotalPending.Core);
+					PQUEMGR->RatioTrackers[i]->TotalPending.Core,
+					true);
 		}
 	}
 }
@@ -4421,6 +4432,7 @@ void timeoutQueuedRequest(void)
 						  "available cluster.");
 			/* Build timeout response. */
 			buildTimeoutResponseForQueuedRequest(ct, RESQUEMGR_NOCLUSTER_TIMEOUT);
+			transformConnectionTrackProgress(ct, CONN_PP_TIMEOUT_FAIL);
 		}
 		else
 		{
@@ -4449,7 +4461,6 @@ void timeoutQueuedRequest(void)
 
 		if ( curcon->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
 		{
-			elog(DEBUG3, "Check waiting connection track now.");
 			/*
 			 * Check if corresponding mem core ratio tracker has long enough
 			 * time to waiting for GRM containers.
@@ -4463,6 +4474,24 @@ void timeoutQueuedRequest(void)
 			 */
 			Assert(PQUEMGR->RootTrack != NULL);
 
+			/* Check if this is a head request in the queue. */
+			if ( queuetrack->QueryResRequests.NodeCount > 0 )
+			{
+				ConnectionTrack topwaiter = getDQueueHeadNodeData(&(queuetrack->QueryResRequests));
+				if ( topwaiter == curcon && topwaiter->HeadQueueTime == 0 )
+				{
+					topwaiter->HeadQueueTime = gettime_microsec();
+					elog(DEBUG3, "Set timestamp of waiting at head of queue.");
+				}
+			}
+
+			elog(DEBUG3, "Check waiting connection track: ConnID %d "
+						 "Head time "UINT64_FORMAT " "
+						 "Global resource pending time "UINT64_FORMAT " ",
+						 curcon->ConnID,
+						 curmsec - curcon->HeadQueueTime,
+						 curmsec - PQUEMGR->RatioTrackers[index]->TotalPendingStartTime);
+
 			bool tocancel = false;
 
 			if ( ( (PQUEMGR->RootTrack->ClusterSegNumberMax == 0) &&
@@ -4514,7 +4543,7 @@ void timeoutQueuedRequest(void)
 			if ( tocancel )
 			{
 				cancelResourceAllocRequest(curcon);
-				returnConnectionToQueue(curcon, false);
+				returnConnectionToQueue(curcon, true);
 			}
 		}
 	}
@@ -4534,7 +4563,6 @@ void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack, uint32_t re
 								conntrack->MessageMark1,
 								conntrack->MessageMark2,
 								RESPONSE_QD_ACQUIRE_RESOURCE);
-	transformConnectionTrackProgress(conntrack, CONN_PP_TIMEOUT_FAIL);
 	conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);