You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/11/26 04:06:32 UTC

incubator-hawq git commit: HAWQ-194. Test facility for pausing global resource manager container lifecycle in resource pool

Repository: incubator-hawq
Updated Branches:
  refs/heads/master cd60c9fec -> 897da318d


HAWQ-194. Test facility for pausing global resource manager container lifecycle in resource pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/897da318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/897da318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/897da318

Branch: refs/heads/master
Commit: 897da318d50ed8c74b55caa79ac540b7d72be874
Parents: cd60c9f
Author: Yi Jin <yj...@pivotal.io>
Authored: Thu Nov 26 11:06:21 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Thu Nov 26 11:06:21 2015 +0800

----------------------------------------------------------------------
 .../communication/rmcomm_MessageHandler.c       |  2 +
 .../communication/rmcomm_QD2RM.c                | 86 ++++++++++++++++++++
 .../communication/rmcomm_MessageHandler.h       |  2 +
 .../communication/rmcomm_QD_RM_Protocol.h       | 12 +++
 src/backend/resourcemanager/include/dynrm.h     |  1 +
 .../resourcemanager/include/resourcepool.h      | 16 ++++
 src/backend/resourcemanager/requesthandler.c    | 33 ++++++++
 src/backend/resourcemanager/resourcemanager.c   | 23 ++++--
 src/backend/resourcemanager/resourcepool.c      | 23 ++++++
 9 files changed, 190 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c b/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
index 2dc6aa8..34e2744 100644
--- a/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
+++ b/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
@@ -49,6 +49,7 @@ MessageIDPairData MessageIDPairs[] = {
     {REQUEST_QD_DUMP_STATUS,			"REQUEST_QD_DUMP_STATUS"},
     {REQUEST_QD_DUMP_RESQUEUE_STATUS,	"REQUEST_QD_DUMP_RESQUEUE_STATUS"},
 	{REQUEST_DUMMY,						"REQUEST_DUMMY"},
+	{REQUEST_QD_QUOTA_CONTROL,			"REQUEST_QD_QUOTA_CONTROL"},
 
 	{REQUEST_RM_RUALIVE,				"REQUEST_RM_RUALIVE"},
 	{REQUEST_RM_IMALIVE,				"REQUEST_RM_IMALIVE"},
@@ -75,6 +76,7 @@ MessageIDPairData MessageIDPairs[] = {
     {RESPONSE_QD_DUMP_STATUS,			"RESPONSE_QD_DUMP_STATUS"},
     {RESPONSE_QD_DUMP_RESQUEUE_STATUS,	"RESPONSE_QD_DUMP_RESQUEUE_STATUS"},
 	{RESPONSE_DUMMY,					"RESPONSE_DUMMY"},
+	{RESPONSE_QD_QUOTA_CONTROL,			"RESPONSE_QD_QUOTA_CONTROL"},
 
 	{RESPONSE_RM_RUALIVE,				"RESPONSE_RM_RUALIVE"},
 	{RESPONSE_RM_IMALIVE,				"RESPONSE_RM_IMALIVE"},

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 8e81c89..47cac37 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -133,6 +133,7 @@ typedef struct TestActionPlayData *TestActionPlay;
 
 #define RESOURCE_ACTION_RPC_FAULT				"rpcfault"
 #define RESOURCE_ACTION_RPC_FAULT_RM			"rpcrmfault"
+#define RESOURCE_ACTION_QUOTA_PAUSE				"quotapause"
 
 #define PG_PLAY_RESOURCE_ACTION_COLUMNS 5
 #define PG_PLAY_RESOURCE_ACTION_BUFSIZE 1024
@@ -144,6 +145,11 @@ int runTestActionScript(List *actions, const char *filename);
 int findFile(const char *filename);
 int createFile(const char *filename);
 int removeFile(const char *filename);
+int setResourceManagerQuotaControl(bool 	pause,
+								   int 		phase,
+								   char    *errorbuf,
+								   int 		errorbufsize,
+								   int	   *errorcode);
 
 void outputAllcatedResourceToFile(const char *filename, int resourceid);
 void *buildResourceActionPlayRowData(MCTYPE context, List *actions);
@@ -2154,6 +2160,67 @@ int removeFile(const char *filename)
 	return res == 0 ? FUNC_RETURN_OK : FUNC_RETURN_FAIL;
 }
 
+int setResourceManagerQuotaControl(bool 	pause,
+								   int 		phase,
+								   char    *errorbuf,
+								   int 		errorbufsize,
+								   int	   *errorcode)
+{
+	initializeQD2RMComm();
+
+	int 				   res 		   = FUNC_RETURN_OK;
+	SelfMaintainBufferData sendBuffer;
+	SelfMaintainBufferData recvBuffer;
+	initializeSelfMaintainBuffer(&sendBuffer, QD2RM_CommContext);
+	initializeSelfMaintainBuffer(&recvBuffer, QD2RM_CommContext);
+
+	RPCRequestQuotaControlData request;
+	request.Pause = pause;
+	request.Phase = phase;
+
+	appendSMBVar(&sendBuffer, request);
+
+	elog(LOG, "Request GRM container life cycle phase %d %s",
+			  phase,
+			  pause?"paused":"resumed");
+
+	res = callSyncRPCToRM(sendBuffer.Buffer,
+						  sendBuffer.Cursor + 1,
+						  REQUEST_QD_QUOTA_CONTROL,
+						  RESPONSE_QD_QUOTA_CONTROL,
+						  &recvBuffer);
+
+	if ( res != FUNC_RETURN_OK )
+	{
+		snprintf(errorbuf, errorbufsize,
+				 "failed to get response from resource manager RPC.");
+		*errorcode = res;
+		goto exit;
+	}
+
+	RPCResponseQuotaControl response = (RPCResponseQuotaControl)(recvBuffer.Buffer);
+	if ( response->Result == FUNC_RETURN_OK )
+	{
+		elog(LOG, "succeeded in setting container life cycle phase %d %s",
+				  phase,
+				  pause?"paused":"resumed");
+	}
+	else
+	{
+		elog(WARNING, "failed to set container life cycle phase %d %s",
+					  phase,
+					  pause?"paused":"resumed");
+		*errorcode = res;
+		snprintf(errorbuf, errorbufsize,
+				 "failed to get resource quota due to remote error %s.",
+				 getErrorCodeExplain(res));
+	}
+exit:
+	destroySelfMaintainBuffer(&sendBuffer);
+	destroySelfMaintainBuffer(&recvBuffer);
+	return res;
+}
+
 int loadTestActionScript(const char *filename, List **actions)
 {
 	Assert(actions != NULL && *actions == NULL);
@@ -2527,6 +2594,25 @@ int runTestActionScript(List *actions, const char *filename)
 			else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_RPC_FAULT_RM) == 0 )
 			{
 			}
+			else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_QUOTA_PAUSE) == 0 )
+			{
+				/*
+				 * The first argument should be pause or resume quota changes;
+				 * The second argument should be which phase should be paused or
+				 * resumed.
+				 */
+				ListCell *cell = list_head(actitem->Arguments);
+				const char *action = (const char *)lfirst(cell);
+				cell = lnext(cell);
+				int phase = atoi((const char *)lfirst(cell));
+				ret = setResourceManagerQuotaControl(strcmp(action, "pause") == 0 ?
+											   	   	     true:
+														 false,
+													 phase,
+													 errorbuf,
+													 sizeof(errorbuf),
+													 &errorcode);
+			}
 
 			actitem->ResultCode = ret;
 			actitem->ResultCode = actitem->ResultCode == FUNC_RETURN_OK ?

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/include/communication/rmcomm_MessageHandler.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_MessageHandler.h b/src/backend/resourcemanager/include/communication/rmcomm_MessageHandler.h
index b95a4f7..c7e0da0 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_MessageHandler.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_MessageHandler.h
@@ -45,6 +45,7 @@ enum RM_MESSAGE_ID {
     REQUEST_QD_DUMP_STATUS,
     REQUEST_QD_DUMP_RESQUEUE_STATUS,
 	REQUEST_DUMMY,
+	REQUEST_QD_QUOTA_CONTROL,
 
 	/* Request between RM and RMSEG */
 	REQUEST_RM_BEGINTAG										= 512,
@@ -77,6 +78,7 @@ enum RM_MESSAGE_ID {
     RESPONSE_QD_DUMP_STATUS,
     RESPONSE_QD_DUMP_RESQUEUE_STATUS,
 	RESPONSE_DUMMY,
+	RESPONSE_QD_QUOTA_CONTROL,
 
 	/* Response between RM and RMSEG */
 	RESPONSE_RM_BEGINTAG									= 2560,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
index db65527..58260b7 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
@@ -422,6 +422,18 @@ RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseResQueueStatus)
 RPC_PROTOCOL_STRUCT_END(RPCResponseResQueueStatus)
 
 /*******************************************************************************
+ * Protocol of Segment Resource Quota Control Request.
+ ******************************************************************************/
+RPC_PROTOCOL_STRUCT_BEGIN(RPCRequestQuotaControl)
+	uint32_t	Pause;
+	uint32_t	Phase;
+RPC_PROTOCOL_STRUCT_END(RPCRequestQuotaControl)
+
+RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseQuotaControl)
+	uint32_t	Result;
+	uint32_t	Reserved;
+RPC_PROTOCOL_STRUCT_END(RPCResponseQuotaControl)
+/*******************************************************************************
  * Protocol of Dummy Request.
  ******************************************************************************/
 RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseDummy)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index 7de5573..677addf 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -116,6 +116,7 @@ bool handleRMRequestDumpStatus(void **arg);
 bool handleRMRequestDumpResQueueStatus(void **arg);
 
 bool handleRMRequestDummy(void **arg);
+bool handleRMRequestQuotaControl(void **arg);
 
 int refreshLocalHostInstance(void);
 void checkLocalPostmasterStatus(void);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index e81e108..c2cc166 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -311,6 +311,16 @@ uint32_t getSegResourceCapacityMemory(SegResource segres);
 uint32_t getSegResourceCapacityCore(SegResource segres);
 
 int getSegmentGRMContainerSize(SegResource segres);
+
+enum ResourcePoolQuotaControlFlags
+{
+	QUOTA_PHASE_TOACC_TO_ACCED = 0,
+	QUOTA_PHASE_ACCED_TO_RESPOOL,
+	QUOTA_PHASE_TOKICK_TO_KICKED,
+	QUOTA_PHASE_KICKED_TO_RETURN,
+	QUOTA_PHASE_COUNT
+};
+
 /*
  *------------------------------------------------------------------------------
  * RESOURCE POOL
@@ -474,6 +484,12 @@ struct ResourcePoolData {
 	int				RetPendingContainerCount;
 
 	/*
+	 * The flags for testing the life cycle of GRM containers by pausing specified
+	 * phases.
+	 */
+	bool			pausePhase[QUOTA_PHASE_COUNT];
+
+	/*
 	 * The function array for extending multiple policies of allocating virtual
 	 * segments from the segment pool.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index bca0a38..c0eae8d 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -1170,3 +1170,36 @@ bool handleRMRequestDummy(void **arg)
 
     return true;
 }
+
+bool handleRMRequestQuotaControl(void **arg)
+{
+	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+	RPCRequestQuotaControl request = (RPCRequestQuotaControl)(conntrack->MessageBuff.Buffer);
+	Assert(request->Phase >= 0 && request->Phase < QUOTA_PHASE_COUNT);
+	bool oldvalue = PRESPOOL->pausePhase[request->Phase];
+	PRESPOOL->pausePhase[request->Phase] = request->Pause;
+	if ( oldvalue != PRESPOOL->pausePhase[request->Phase] )
+	{
+		elog(LOG, "Resource manager resource quota life cycle pause setting %d "
+				  "changes to %s",
+				  request->Phase,
+				  PRESPOOL->pausePhase[request->Phase]?"paused":"resumed");
+	}
+
+	RPCResponseQuotaControlData response;
+	response.Result 	= FUNC_RETURN_OK;
+	response.Reserved	= 0;
+
+    buildResponseIntoConnTrack(conntrack,
+                               (char *)&response,
+                               sizeof(response),
+                               conntrack->MessageMark1,
+                               conntrack->MessageMark2,
+							   RESPONSE_QD_QUOTA_CONTROL);
+    conntrack->ResponseSent = false;
+	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+	MEMORY_CONTEXT_SWITCH_BACK
+
+	return true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 9bc5f4f..ecdae5a 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -459,6 +459,7 @@ int ResManagerMainServer2ndPhase(void)
 	registerMessageHandler(REQUEST_QD_DUMP_STATUS           , handleRMRequestDumpStatus);
     registerMessageHandler(REQUEST_QD_DUMP_RESQUEUE_STATUS  , handleRMRequestDumpResQueueStatus);
 	registerMessageHandler(REQUEST_DUMMY                    , handleRMRequestDummy);
+	registerMessageHandler(REQUEST_QD_QUOTA_CONTROL 		, handleRMRequestQuotaControl);
 	/* New socket facility poll based server.*/
 	res = initializeSocketServer();
 	if ( res != FUNC_RETURN_OK ) {
@@ -2885,14 +2886,20 @@ void processResourceBrokerTasks(void)
         /* STEP 4. Return kicked GRM containers. */
         curtime = gettime_microsec();
 
-        res = RB_returnResource(&(PRESPOOL->KickedContainers));
-        if ( res != FUNC_RETURN_OK )
-        {
-        	elog(WARNING, "Resource manager failed to return kicked container to "
-        				  "global resource manager.");
-        	goto exit;
-        }
-
+    	if ( !PRESPOOL->pausePhase[QUOTA_PHASE_KICKED_TO_RETURN] )
+    	{
+			res = RB_returnResource(&(PRESPOOL->KickedContainers));
+			if ( res != FUNC_RETURN_OK )
+			{
+				elog(WARNING, "Resource manager failed to return kicked container to "
+							  "global resource manager.");
+				goto exit;
+			}
+    	}
+    	else
+		{
+			elog(LOG, "Paused returning GRM containers kicked to GRM.");
+    	}
 	    /*
 	     * STEP 5. Handle resource broker input as new allocated resource or
 		 * 		   cluster report, container report etc. The allocated resource

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/897da318/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 721cfef..4a7448b 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -357,6 +357,11 @@ void initializeResourcePoolManager(void)
 
 	PRESPOOL->SlavesFileTimestamp = 0;
 	PRESPOOL->SlavesHostCount	  = 0;
+
+	for ( int i = 0 ; i < QUOTA_PHASE_COUNT ; ++i )
+	{
+		PRESPOOL->pausePhase[i] = false;
+	}
 }
 
 #define CONNECT_TIMEOUT 60
@@ -3041,6 +3046,12 @@ int notifyToBeAcceptedGRMContainersToRMSEG(void)
 	List	 *ctnss = NULL;
 	ListCell *cell  = NULL;
 
+	if ( PRESPOOL->pausePhase[QUOTA_PHASE_TOACC_TO_ACCED] )
+	{
+		elog(LOG, "Paused notifying GRM containers to be accepted to segments.");
+		return FUNC_RETURN_OK;
+	}
+
 	getAllPAIRRefIntoList(&(PRESPOOL->ToAcceptContainers), &ctnss);
 
 	foreach(cell, ctnss)
@@ -3086,6 +3097,12 @@ int notifyToBeKickedGRMContainersToRMSEG(void)
 	List	 *ctnss = NULL;
 	ListCell *cell  = NULL;
 
+	if ( PRESPOOL->pausePhase[QUOTA_PHASE_TOKICK_TO_KICKED] )
+	{
+		elog(LOG, "Paused notifying GRM containers to be kicked to segments.");
+		return FUNC_RETURN_OK;
+	}
+
 	getAllPAIRRefIntoList(&(PRESPOOL->ToKickContainers), &ctnss);
 
 	foreach(cell, ctnss)
@@ -3126,6 +3143,12 @@ int notifyToBeKickedGRMContainersToRMSEG(void)
 
 void moveAllAcceptedGRMContainersToResPool(void)
 {
+	if ( PRESPOOL->pausePhase[QUOTA_PHASE_ACCED_TO_RESPOOL] )
+	{
+		elog(LOG, "Paused adding GRM containers accepted to resource pool.");
+		return;
+	}
+
 	while( PRESPOOL->AcceptedContainers != NULL )
 	{
 		GRMContainer ctn = (GRMContainer)