You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/26 12:30:10 UTC

incubator-hawq git commit: HAWQ-47. Make user able to set statement level resource usage. This closes #34

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 133ae317a -> 3a8dcd219


HAWQ-47. Make user able to set statement level resource usage. This closes #34


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3a8dcd21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3a8dcd21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3a8dcd21

Branch: refs/heads/master
Commit: 3a8dcd219f99890da42db8a8d427a5ebd828454a
Parents: 133ae31
Author: Yi Jin <yj...@pivotal.io>
Authored: Mon Oct 26 19:29:02 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Mon Oct 26 19:29:02 2015 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbvars.c                       |   2 +
 .../communication/rmcomm_QD2RM.c                |  97 +++--
 src/backend/resourcemanager/conntrack.c         |  13 +-
 .../communication/rmcomm_QD_RM_Protocol.h       |   4 +
 src/backend/resourcemanager/include/conntrack.h |   3 +
 src/backend/resourcemanager/requesthandler.c    |  13 +
 src/backend/resourcemanager/resqueuecommand.c   |  16 +-
 src/backend/resourcemanager/resqueuemanager.c   | 394 ++++++++++---------
 src/backend/utils/misc/guc.c                    |  55 +++
 src/include/cdb/cdbvars.h                       |   3 +
 10 files changed, 378 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 921e87f..6b468e9 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -321,6 +321,8 @@ char   *rm_grm_yarn_queue;
 char   *rm_grm_yarn_app_name;
 int		rm_grm_breath_return_percentage;
 
+char   *rm_stmt_vseg_mem_str;
+int		rm_stmt_nvseg;
 
 int		rm_seg_container_default_waterlevel;
 bool	rm_force_fifo_queue;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 10189e0..09d6da9 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -32,7 +32,7 @@
 		 QD2RM_ResourceSets[(index)] == NULL ) 								   \
 	{					   	   	   											   \
 		snprintf((errorbuf), (errorbufsize), 							   	   \
-				 "Wrong resource set index %d", (index)); 	   		   		   \
+				 "wrong resource set index %d", (index)); 	   		   		   \
 		return COMM2RM_CLIENT_WRONG_INPUT;								   	   \
 	}
 
@@ -374,20 +374,24 @@ int cleanupQD2RMComm(void)
         {
             if ( QD2RM_ResourceSets[i]->QD_ResourceList != NULL )
             {
-                elog( LOG, "Un-returned resource is probed, will be returned. "
-                        "(%d MB, %lf CORE) x %d. Conn ID=%d",
-                        QD2RM_ResourceSets[i]->QD_SegMemoryMB,
-                        QD2RM_ResourceSets[i]->QD_SegCore,
-                        QD2RM_ResourceSets[i]->QD_SegCount,
-                        QD2RM_ResourceSets[i]->QD_Conn_ID);
+            	elog(LOG, "Un-returned resource is probed, will be returned. "
+                          "(%d MB, %lf CORE) x %d. Conn ID=%d",
+                          QD2RM_ResourceSets[i]->QD_SegMemoryMB,
+                          QD2RM_ResourceSets[i]->QD_SegCore,
+                          QD2RM_ResourceSets[i]->QD_SegCount,
+                          QD2RM_ResourceSets[i]->QD_Conn_ID);
 
                 res = returnResource(i, errorbuf, sizeof(errorbuf));
-                if ( res != FUNC_RETURN_OK ) {
-                	elog(LOG, "Fail to return resource when cleaning up resource context.");
+                if ( res != FUNC_RETURN_OK )
+                {
+                	elog(WARNING, "Failed to return resource when cleaning up "
+                				  "resource context.");
             	}
                 res = unregisterConnectionInRM(i, errorbuf, sizeof(errorbuf));
-                if ( res != FUNC_RETURN_OK ) {
-                	elog(LOG, "Fail to unregister when cleaning up resource context.");
+                if ( res != FUNC_RETURN_OK )
+                {
+                	elog(WARNING, "Failed to unregister when cleaning up "
+                				  "resource context.");
                 }
             }
         }
@@ -422,7 +426,7 @@ int registerConnectionInRMByStr(int 		   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to register in HAWQ resource manager because of "
+    			 "failed to register in HAWQ resource manager because of "
     			 "RPC error %s.",
 				 getErrorCodeExplain(res));
     	return res;
@@ -436,7 +440,7 @@ int registerConnectionInRMByStr(int 		   index,
     if ( response->Result != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to register in HAWQ resource manager because of remote"
+    			 "failed to register in HAWQ resource manager because of remote"
     			 "error %s.",
 				 getErrorCodeExplain(response->Result));
     	return response->Result;
@@ -472,7 +476,7 @@ int registerConnectionInRMByOID(int 		   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to register in HAWQ resource manager because of "
+    			 "failed to register in HAWQ resource manager because of "
     			 "RPC error %s.",
 				 getErrorCodeExplain(res));
     	return res;
@@ -486,7 +490,7 @@ int registerConnectionInRMByOID(int 		   index,
     if ( response->Result != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to register in HAWQ resource manager because of remote"
+    			 "failed to register in HAWQ resource manager because of remote"
     			 "error %s.",
 				 getErrorCodeExplain(response->Result));
     	return response->Result;
@@ -522,7 +526,7 @@ int	unregisterConnectionInRM(int 			   index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to unregister in HAWQ resource manager because of "
+    			 "failed to unregister in HAWQ resource manager because of "
     			 "RPC error %s.",
 				 getErrorCodeExplain(res));
     	return res;
@@ -535,7 +539,7 @@ int	unregisterConnectionInRM(int 			   index,
     {
     	res = response->Result;
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to unregister in HAWQ resource manager because of "
+    			 "failed to unregister in HAWQ resource manager because of "
     			 "remote error %s.",
 				 getErrorCodeExplain(response->Result));
     }
@@ -606,6 +610,15 @@ int acquireResourceFromRM(int 		  		  index,
     requesthead.VSegLimit		 = rm_nvseg_perquery_limit;
     requesthead.Reserved		 = 0;
     requesthead.IOBytes		 	 = iobytes;
+    requesthead.StatNVSeg		 = rm_stmt_nvseg;
+
+    requesthead.StatVSegMemoryMB = 0;
+    int parseres = FUNC_RETURN_OK;
+    SimpString valuestr;
+    setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str));
+    parseres = SimpleStringToStorageSizeMB(&valuestr,
+    									   &(requesthead.StatVSegMemoryMB));
+    Assert(parseres == FUNC_RETURN_OK);
 
     appendSMBVar(sendbuffer,requesthead);
 
@@ -636,8 +649,8 @@ int acquireResourceFromRM(int 		  		  index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to acquire resource from HAWQ resource manager because of "
-    			 "RPC error %s.",
+    			 "failed to acquire resource from HAWQ resource manager because "
+    			 "of RPC error %s.",
 				 getErrorCodeExplain(res));
     	pgstat_report_waiting_resource(false);
     	return res;
@@ -649,7 +662,7 @@ int acquireResourceFromRM(int 		  		  index,
     if ( errres->Result != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to acquire resource because of remote error %s.",
+    			 "failed to acquire resource because of remote error %s.",
     			 getErrorCodeExplain(errres->Result));
     	return errres->Result;
     }
@@ -776,7 +789,7 @@ int returnResource(int 		index,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to return resource to HAWQ resource manager because of "
+    			 "failed to return resource to HAWQ resource manager because of "
     			 "RPC error %s.",
 				 getErrorCodeExplain(res));
     	return res;
@@ -865,7 +878,7 @@ int manipulateResourceQueue(int 	 index,
 		RPCResponseHeadManipulateResQueueERROR error =
 			(RPCResponseHeadManipulateResQueueERROR)(recvbuffer->Buffer);
 
-		elog(WARNING, "Fail to manipulate resource queue because %s",
+		elog(LOG, "Fail to manipulate resource queue because %s",
 					  error->ErrorText);
 		snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
 	}
@@ -1147,11 +1160,21 @@ int acquireResourceQuotaFromRM(int64_t		user_oid,
 	initializeSelfMaintainBuffer(&recvBuffer, QD2RM_CommContext);
 
 	RPCRequestHeadAcquireResourceQuotaFromRMByOIDData request;
-	request.UseridOid     	 = user_oid;
-	request.MaxSegCountFix 	 = max_seg_count_fix;
-	request.MinSegCountFix   = min_seg_count_fix;
-    request.VSegLimitPerSeg	 = rm_nvseg_perquery_perseg_limit;
-    request.VSegLimit		 = rm_nvseg_perquery_limit;
+	request.UseridOid		 = user_oid;
+	request.MaxSegCountFix	 = max_seg_count_fix;
+	request.MinSegCountFix	 = min_seg_count_fix;
+	request.VSegLimitPerSeg	 = rm_nvseg_perquery_perseg_limit;
+	request.VSegLimit		 = rm_nvseg_perquery_limit;
+	request.StatNVSeg		 = rm_stmt_nvseg;
+
+	request.StatVSegMemoryMB = 0;
+	int parseres = FUNC_RETURN_OK;
+	SimpString valuestr;
+	setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str));
+	parseres = SimpleStringToStorageSizeMB(&valuestr,
+										   &(request.StatVSegMemoryMB));
+	Assert(parseres == FUNC_RETURN_OK);
+
 	appendSMBVar(&sendBuffer, request);
 
 	elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, "
@@ -1168,24 +1191,27 @@ int acquireResourceQuotaFromRM(int64_t		user_oid,
     if ( res != FUNC_RETURN_OK )
     {
     	snprintf(errorbuf, errorbufsize,
-    			 "Fail to get response from resource manager RPC.");
+    			 "failed to get response from resource manager RPC.");
     	*errorcode = res;
     	goto exit;
     }
 
     RPCResponseHeadAcquireResourceQuotaFromRMByOID response =
     	(RPCResponseHeadAcquireResourceQuotaFromRMByOID)(recvBuffer.Buffer);
-    if ( response->Result == FUNC_RETURN_OK ) {
+    if ( response->Result == FUNC_RETURN_OK )
+    {
     	*seg_num 		= response->SegNum;
     	*seg_num_min 	= response->SegNumMin;
     	*seg_memory_mb  = response->SegMemoryMB;
     	*seg_core		= response->SegCore;
     }
-    else {
+    else
+    {
     	res = response->Result;
     	*errorcode = res;
-    	snprintf(errorbuf, errorbufsize, "Fail to get resource quota due to "
-    									 "remote error %d.", res);
+    	snprintf(errorbuf, errorbufsize,
+    			 "failed to get resource quota due to remote error %s.",
+				 getErrorCodeExplain(res));
     }
 
 exit:
@@ -1610,15 +1636,16 @@ extern Datum pg_explain_resource_distribution(PG_FUNCTION_ARGS)
 		ret = returnResource(resourceId, errorbuf, sizeof(errorbuf));
 		if ( ret != FUNC_RETURN_OK )
 		{
-			elog(ERROR, "Fail to return resource back to resource manager "
-					    "because %s", errorbuf);
+			elog(ERROR, "failed to return resource back to resource manager "
+					    "because %s",
+						errorbuf);
 		}
 
 		/* STEP 5. Unregister. */
 		ret = unregisterConnectionInRM(resourceId, errorbuf, sizeof(errorbuf));
 		if ( ret != FUNC_RETURN_OK )
 		{
-			elog(ERROR, "Fail to unregister connection in RM because %s",
+			elog(ERROR, "failed to unregister connection in RM because %s",
 						errorbuf);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/conntrack.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/conntrack.c b/src/backend/resourcemanager/conntrack.c
index 12d59d7..62dea4c 100644
--- a/src/backend/resourcemanager/conntrack.c
+++ b/src/backend/resourcemanager/conntrack.c
@@ -109,6 +109,9 @@ void createEmptyConnectionTrack(ConnectionTrack *track)
 	(*track)->MinSegCountFixed			= 0;
 	(*track)->VSegLimitPerSeg			= -1;
 	(*track)->VSegLimit					= -1;
+	(*track)->StatVSegMemoryMB			= 0;
+	(*track)->StatNVSeg					= 0;
+	(*track)->SegNumEqual				= 0;
 	(*track)->SliceSize					= 0;
 	(*track)->IOBytes					= 0;
 	(*track)->QueueID			 		= 0;
@@ -259,6 +262,9 @@ int retrieveConnectionTrack(ConnectionTrack track, int32_t connid)
 	track->MinSegCountFixed			= oldct->MinSegCountFixed;
 	track->VSegLimitPerSeg			= oldct->VSegLimitPerSeg;
 	track->VSegLimit				= oldct->VSegLimit;
+	track->StatNVSeg				= oldct->StatNVSeg;
+	track->StatVSegMemoryMB			= oldct->StatVSegMemoryMB;
+	track->SegNumEqual				= oldct->SegNumEqual;
 	track->SliceSize				= oldct->SliceSize;
 	track->SegIOBytes				= oldct->SegIOBytes;
 	track->IOBytes					= oldct->IOBytes;
@@ -595,7 +601,8 @@ void dumpConnectionTracks(const char *filename)
 						"vseg limit per query=%d:"
 						"fixsegsize=%d:"
 						"reqtime=%s:"
-						"alloctime=%s),",
+						"alloctime=%s:"
+						"stmt=%d MB x %d),",
 						conn->SessionID,
 						conn->SegMemoryMB, conn->SegCore,
 						conn->SegNum, conn->SegNumMin, conn->SegNumActual,
@@ -605,7 +612,9 @@ void dumpConnectionTracks(const char *filename)
 						conn->VSegLimit,
 						conn->MinSegCountFixed,
 						format_time_microsec(conn->ResRequestTime),
-						format_time_microsec(conn->ResAllocTime));
+						format_time_microsec(conn->ResAllocTime),
+						conn->StatVSegMemoryMB,
+						conn->StatNVSeg);
 
 			fprintf(fp, "LOC(size=%d", conn->SegPreferredHostCount);
 			if ( conn->SegPreferredHostCount <= 0 )

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
index 17d397b..a275d0e 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
@@ -196,6 +196,8 @@ RPC_PROTOCOL_STRUCT_BEGIN(RPCRequestHeadAcquireResourceFromRM)
 	int32_t			SliceSize;
 	uint32_t		VSegLimitPerSeg;
 	uint32_t		VSegLimit;
+	uint32_t		StatVSegMemoryMB;
+	uint32_t		StatNVSeg;
 	uint32_t		Reserved;
 	int64_t			IOBytes;
 RPC_PROTOCOL_STRUCT_END(RPCRequestHeadAcquireResourceFromRM)
@@ -246,6 +248,8 @@ RPC_PROTOCOL_STRUCT_BEGIN(RPCRequestHeadAcquireResourceQuotaFromRMByOID)
 	uint32_t		MinSegCountFix;
 	uint32_t		VSegLimitPerSeg;
 	uint32_t		VSegLimit;
+	uint32_t		StatVSegMemoryMB;
+	uint32_t		StatNVSeg;
 RPC_PROTOCOL_STRUCT_END(RPCRequestHeadAcquireResourceQuotaFromRMByOID)
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/include/conntrack.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/conntrack.h b/src/backend/resourcemanager/include/conntrack.h
index f809102..76e8c7c 100644
--- a/src/backend/resourcemanager/include/conntrack.h
+++ b/src/backend/resourcemanager/include/conntrack.h
@@ -95,6 +95,7 @@ struct ConnectionTrackData
 	int32_t					SegNum;
 	int32_t					SegNumMin;
 	int32_t					SegNumActual;
+	int32_t					SegNumEqual;
 	int32_t					SegPreferredHostCount;
 	char 			  	  **SegPreferredHostNames;
 	int64_t			   	   *SegPreferredScanSizeMB;
@@ -104,6 +105,8 @@ struct ConnectionTrackData
 	int32_t			    	MinSegCountFixed;
 	int32_t					VSegLimitPerSeg;
 	int32_t					VSegLimit;
+	uint32_t				StatVSegMemoryMB;
+	uint32_t				StatNVSeg;
 	List				   *Resource;		/* Allocated resource. 	   		  */
 
 	void				   *QueueTrack;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index de8131f..55a3dcd 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -368,6 +368,8 @@ bool handleRMRequestAcquireResource(void **arg)
 	(*conntrack)->SessionID				= request->SessionID;
 	(*conntrack)->VSegLimitPerSeg		= request->VSegLimitPerSeg;
 	(*conntrack)->VSegLimit				= request->VSegLimit;
+	(*conntrack)->StatVSegMemoryMB		= request->StatVSegMemoryMB;
+	(*conntrack)->StatNVSeg				= request->StatNVSeg;
 
 	/* Get preferred nodes. */
 	buildSegPreferredHostInfo((*conntrack));
@@ -381,6 +383,17 @@ bool handleRMRequestAcquireResource(void **arg)
 				 (*conntrack)->SegPreferredHostCount,
 				 (*conntrack)->VSegLimitPerSeg,
 				 (*conntrack)->VSegLimit);
+
+	if ( (*conntrack)->StatNVSeg > 0 )
+	{
+		elog(LOG, "Statement level resource quota is active. "
+				  "ConnID=%d, Expect resource. "
+				  "Total %d vsegs, each vseg has %d MB memory quota.",
+				  (*conntrack)->ConnID,
+				  (*conntrack)->StatNVSeg,
+				  (*conntrack)->StatVSegMemoryMB);
+	}
+
 	res = acquireResourceFromResQueMgr((*conntrack));
 	if ( res != FUNC_RETURN_OK )
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/resqueuecommand.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuecommand.c b/src/backend/resourcemanager/resqueuecommand.c
index 0247fef..19b255a 100644
--- a/src/backend/resourcemanager/resqueuecommand.c
+++ b/src/backend/resourcemanager/resqueuecommand.c
@@ -91,8 +91,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
 		Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
 		ereport(ERROR,
 				(errcode(ERRCODE_INTERNAL_ERROR),
-						 errmsg("Can not apply CREATE RESOURCE QUEUE. "
-								"Because too many resource contexts were created.")));
+						 errmsg("can not apply CREATE RESOURCE QUEUE, "
+								"because too many resource contexts were created.")));
 	}
 
 	/* Here, using user oid is more convenient. */
@@ -125,7 +125,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
 				(errcode(IS_TO_RM_RPC_ERROR(res) ?
 						 ERRCODE_INTERNAL_ERROR :
 						 ERRCODE_INVALID_OBJECT_DEFINITION),
-				 errmsg("Can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
+				 errmsg("can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
 	}
 	elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
 }
@@ -222,8 +222,8 @@ void dropResourceQueue(DropQueueStmt *stmt)
 		Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
 		ereport(ERROR,
 				(errcode(ERRCODE_INTERNAL_ERROR),
-						 errmsg("Can not apply DROP RESOURCE QUEUE. "
-								"Because too many resource contexts were created.")));
+						 errmsg("cannot apply DROP RESOURCE QUEUE, "
+								"because too many resource contexts were created.")));
 	}
 
 	/* Here, using user oid is more convenient. */
@@ -255,7 +255,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
 				(errcode(IS_TO_RM_RPC_ERROR(res) ?
 						 ERRCODE_INTERNAL_ERROR :
 						 ERRCODE_INVALID_OBJECT_DEFINITION),
-				 errmsg("Can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
+				 errmsg("can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
 	}
 
 	elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
@@ -334,7 +334,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
 		Assert(res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT);
 		ereport(ERROR,
 				(errcode(ERRCODE_INTERNAL_ERROR),
-						errmsg("Too many existing resource context.")));
+						errmsg("too many existing resource context.")));
 	}
 
 	/* Here, using user oid is more convenient. */
@@ -365,7 +365,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
 				(errcode(IS_TO_RM_RPC_ERROR(res) ?
 						 ERRCODE_INTERNAL_ERROR :
 						 ERRCODE_INVALID_OBJECT_DEFINITION),
-				 errmsg("Can not apply ALTER RESOURCE QUEUE because %s", errorbuf)));
+				 errmsg("cannot apply ALTER RESOURCE QUEUE because %s", errorbuf)));
 	}
 
 	elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index ef055c3..862eed9 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -97,14 +97,7 @@ computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
 	computeQueryQuota_EVEN
 };
 
-int computeQueryQuota( DynResourceQueueTrack  track,
-					   int32_t			 	 *max_segcountfix,
-					   int32_t			 	 *min_segcountfix,
-		   	   	   	   int32_t		       	 *segmemmb,
-					   double		       	 *segcore,
-					   int32_t		       	 *segnum,
-					   int32_t				 *segnummin,
-					   int32_t				  segnumlimit);
+int computeQueryQuota( ConnectionTrack conn);
 
 /*------------------------------------------
  * The resource distribution functions.
@@ -2116,78 +2109,74 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
 	}
 
 	/* Call quota logic to make decision of resource for current query. */
-	res = computeQueryQuota(queuetrack,
-							&conntrack->MaxSegCountFixed,
-							&conntrack->MinSegCountFixed,
-							&(conntrack->SegMemoryMB),
-							&(conntrack->SegCore),
-							&(conntrack->SegNum),
-							&(conntrack->SegNumMin),
-							conntrack->VSegLimit);
+	res = computeQueryQuota(conntrack);
 
 	if ( res == FUNC_RETURN_OK )
 	{
-
-		int32_t Rmax  = conntrack->SegNum;
-		int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
-		int32_t Rmin  = conntrack->SegNumMin;
-		elog(LOG, "HAWQ RM :: original quota min seg num:%d, max seg num:%d",
-				  conntrack->SegNumMin,
-				  conntrack->SegNum);
-
-		/* Ensure quota [min,max] is between request [min,max] */
-		int32_t Gmax= conntrack->MaxSegCountFixed;
-		int32_t Gmin= conntrack->MinSegCountFixed;
-
-		if(Gmin==1)
+		if ( conntrack->StatNVSeg == 0 )
 		{
-			/* case 1 */
-			conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
-			conntrack->SegNum = min(Gmax,RmaxL);
-			if(conntrack->SegNumMin > conntrack->SegNum)
+			int32_t Rmax  = conntrack->SegNum;
+			int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
+			int32_t Rmin  = conntrack->SegNumMin;
+			elog(LOG, "Original quota min seg num:%d, max seg num:%d",
+					  conntrack->SegNumMin,
+					  conntrack->SegNum);
+
+			/* Ensure quota [min,max] is between request [min,max] */
+			int32_t Gmax= conntrack->MaxSegCountFixed;
+			int32_t Gmin= conntrack->MinSegCountFixed;
+
+			if(Gmin==1)
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 1 */
+				conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
+				conntrack->SegNum = min(Gmax,RmaxL);
+				if(conntrack->SegNumMin > conntrack->SegNum)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
-		else if(Gmax == Gmin)
-		{
-			/* case 2 */
-			conntrack->SegNumMin = Gmax;
-			conntrack->SegNum = Gmax;
-			if(Rmax < Gmax)
+			else if(Gmax == Gmin)
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 2 */
+				conntrack->SegNumMin = Gmax;
+				conntrack->SegNum = Gmax;
+				if(Rmax < Gmax)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
-		else
-		{
-			/* case 3 */
-			conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
-			conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
-			if(conntrack->SegNumMin > conntrack->SegNum)
+			else
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 3 */
+				conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
+				conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
+				if(conntrack->SegNumMin > conntrack->SegNum)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
 
-		elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
-				   conntrack->SegMemoryMB,
-				   conntrack->SegCore,
-				   conntrack->SegNum,
-				   conntrack->SegNumMin);
+			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
+					   conntrack->SegMemoryMB,
+					   conntrack->SegCore,
+					   conntrack->SegNum,
+					   conntrack->SegNumMin);
 
-		adjustResourceExpectsByQueueNVSegLimits(conntrack);
+			adjustResourceExpectsByQueueNVSegLimits(conntrack);
 
-		elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
-				  "resource after adjusting based on queue NVSEG limits.",
-				   conntrack->SegMemoryMB,
-				   conntrack->SegCore,
-				   conntrack->SegNum,
-				   conntrack->SegNumMin);
+			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+					  "resource after adjusting based on queue NVSEG limits.",
+					   conntrack->SegMemoryMB,
+					   conntrack->SegCore,
+					   conntrack->SegNum,
+					   conntrack->SegNumMin);
+		}
 
 		/* Add request to the resource queue and return. */
 		res = addQueryResourceRequestToQueue(queuetrack, conntrack);
-		if ( res == FUNC_RETURN_OK ) {
+		if ( res == FUNC_RETURN_OK )
+		{
 			transformConnectionTrackProgress(conntrack,
 											 CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
 			return res;
@@ -2230,75 +2219,73 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
 		goto exit;
 	}
 
+	conntrack->QueueTrack = queuetrack;
+	conntrack->QueueID	  = queuetrack->QueueInfo->OID;
+
 	/* Compute query quota */
-	res = computeQueryQuota(queuetrack,
-							&conntrack->MaxSegCountFixed,
-							&conntrack->MinSegCountFixed,
-							&(conntrack->SegMemoryMB),
-							&(conntrack->SegCore),
-							&(conntrack->SegNum),
-							&(conntrack->SegNumMin),
-							conntrack->VSegLimit);
+	res = computeQueryQuota(conntrack);
 
 	if ( res == FUNC_RETURN_OK )
 	{
-
-		int32_t Rmax = conntrack->SegNum;
-		int32_t RmaxL =conntrack->VSegLimitPerSeg *	PRESPOOL->AvailNodeCount;
-		int32_t Rmin = conntrack->SegNumMin;
-		elog(LOG, "HAWQ RM :: original quota min seg num:%d, max seg num:%d",
-					conntrack->SegNumMin,
-					conntrack->SegNum);
-
-		/* Ensure quota [min,max] is between request [min,max] */
-		int32_t Gmax= conntrack->MaxSegCountFixed;
-		int32_t Gmin= conntrack->MinSegCountFixed;
-
-		if(Gmin==1)
+		if ( conntrack->StatNVSeg == 0 )
 		{
-			/* case 1 */
-			conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
-			conntrack->SegNum = min(Gmax,RmaxL);
-			if(conntrack->SegNumMin > conntrack->SegNum)
+			int32_t Rmax = conntrack->SegNum;
+			int32_t RmaxL =conntrack->VSegLimitPerSeg *	PRESPOOL->AvailNodeCount;
+			int32_t Rmin = conntrack->SegNumMin;
+			elog(LOG, "Original quota min seg num:%d, max seg num:%d",
+						conntrack->SegNumMin,
+						conntrack->SegNum);
+
+			/* Ensure quota [min,max] is between request [min,max] */
+			int32_t Gmax= conntrack->MaxSegCountFixed;
+			int32_t Gmin= conntrack->MinSegCountFixed;
+
+			if(Gmin==1)
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 1 */
+				conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
+				conntrack->SegNum = min(Gmax,RmaxL);
+				if(conntrack->SegNumMin > conntrack->SegNum)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
-		else if(Gmax == Gmin)
-		{
-			/* case 2 */
-			conntrack->SegNumMin = Gmax;
-			conntrack->SegNum = Gmax;
-			if(Rmax < Gmax)
+			else if(Gmax == Gmin)
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 2 */
+				conntrack->SegNumMin = Gmax;
+				conntrack->SegNum = Gmax;
+				if(Rmax < Gmax)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
-		else
-		{
-			/* case 3 */
-			conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
-			conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
-			if(conntrack->SegNumMin > conntrack->SegNum)
+			else
 			{
-				return RESQUEMGR_NO_RESOURCE;
+				/* case 3 */
+				conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
+				conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
+				if(conntrack->SegNumMin > conntrack->SegNum)
+				{
+					return RESQUEMGR_NO_RESOURCE;
+				}
 			}
-		}
 
-		elog(LOG, "Expect (%d MB, %lf CORE) x %d ( min %d ) resource quota.",
-				   conntrack->SegMemoryMB,
-				   conntrack->SegCore,
-				   conntrack->SegNum,
-				   conntrack->SegNumMin);
+			elog(LOG, "Expect (%d MB, %lf CORE) x %d ( min %d ) resource quota.",
+					   conntrack->SegMemoryMB,
+					   conntrack->SegCore,
+					   conntrack->SegNum,
+					   conntrack->SegNumMin);
 
-		adjustResourceExpectsByQueueNVSegLimits(conntrack);
+			adjustResourceExpectsByQueueNVSegLimits(conntrack);
 
-		elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
-				  "resource after adjusting based on queue NVSEG limits.",
-				   conntrack->SegMemoryMB,
-				   conntrack->SegCore,
-				   conntrack->SegNum,
-				   conntrack->SegNumMin);
+			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+					  "resource after adjusting based on queue NVSEG limits.",
+					   conntrack->SegMemoryMB,
+					   conntrack->SegCore,
+					   conntrack->SegNum,
+					   conntrack->SegNumMin);
+		}
 	}
 	else
 	{
@@ -2498,17 +2485,6 @@ void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec)
 			mctrack->TotalRequest.MemoryMB = mctrack->ClusterMemoryMaxMB;
 		}
 
-		/*
-		elog(DEBUG5, "HAWQ RM :: Memory/ratio[%d] %d MBPCORE has "
-					 "(%d MB, %lf CORE) in use, (%d MB, %lf CORE) requested.",
-					 i,
-					 mctrack->MemCoreRatio,
-					 mctrack->TotalUsed.MemoryMB,
-					 mctrack->TotalUsed.Core,
-					 mctrack->TotalRequest.MemoryMB,
-					 mctrack->TotalRequest.Core);
-		*/
-
 		markMemoryCoreRatioWaterMark(&(PQUEMGR->RatioWaterMarks[i]),
 									 curmicrosec,
 									 mctrack->TotalUsed.MemoryMB,
@@ -3301,66 +3277,107 @@ void minusResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle sourc
 /**
  * Compute the query quota.
  */
-int computeQueryQuota( DynResourceQueueTrack	 track,
-		   	   	   	   int32_t			 		*max_segcountfix,
-		   	   	   	   int32_t			 		*min_segcountfix,
-		   	   	   	   int32_t		       		*segmemmb,
-					   double		       		*segcore,
-					   int32_t		       		*segnum,
-					   int32_t					*segnummin,
-					   int32_t					 segnumlimit)
+int computeQueryQuota( ConnectionTrack conn)
 {
-	int			res				= FUNC_RETURN_OK;
-	int			policy			= 0;
+	Assert( conn != NULL );
+	Assert( conn->QueueTrack != NULL );
 
-	Assert( track != NULL );
+	int					  res		= FUNC_RETURN_OK;
+	int					  policy	= 0;
+	DynResourceQueueTrack track		= (DynResourceQueueTrack)(conn->QueueTrack);
 
 	policy = track->QueueInfo->AllocatePolicy;
 	Assert( policy >= 0 && policy < RSQ_ALLOCATION_POLICY_COUNT );
 
-	/* Get one segment resource quota. */
-	*segmemmb = track->QueueInfo->SegResourceQuotaMemoryMB;
-	*segcore  = track->QueueInfo->SegResourceQuotaVCore;
-
-	/* Decide segment number and minimum runnable segment number. */
-
-	if (*min_segcountfix > segnumlimit)
+	/*
+	 *--------------------------------------------------------------------------
+	 * Get one segment resource quota. If statement level resource quota is not
+	 * specified, the queue vseg resource quota is derived, otherwise, statement
+	 * level resource quota. The resource memory/core ratio is not changed, thus
+	 * code has to calculate the adjusted vcore quota for each vseg in case
+	 * statement level resource quota is active.
+	 *--------------------------------------------------------------------------
+	 */
+	if ( conn->StatNVSeg > 0 )
 	{
-		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
-		elog(LOG, " Expect too many virtual segments %d, can not be more "
-		"than %d",
-		*min_segcountfix,
-		segnumlimit);
-		return res;
+		conn->SegMemoryMB = conn->StatVSegMemoryMB;
+		conn->SegCore	  = track->QueueInfo->SegResourceQuotaVCore *
+							conn->StatVSegMemoryMB /
+							track->QueueInfo->SegResourceQuotaMemoryMB;
+		conn->SegNum	  = conn->StatNVSeg;
+		conn->SegNumMin	  = conn->StatNVSeg;
+
+		/* Check if the resource capacity is more than the capacity of queue. */
+		conn->SegNumEqual = ceil(1.0 * conn->SegMemoryMB * conn->SegNumMin /
+						  	     track->QueueInfo->SegResourceQuotaMemoryMB);
+		Assert( conn->SegNumEqual > 0 );
+		if ( conn->SegNumEqual > track->ClusterSegNumberMax )
+		{
+			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
+			elog(WARNING, "ConnID %d expects too many virtual segments %d that is"
+						  "set by hawq_rm_stmt_nvseg.",
+						  conn->ConnID,
+						  conn->SegNum);
+			return res;
+		}
 	}
-	if(*max_segcountfix > segnumlimit)
+	else
 	{
-		*max_segcountfix = segnumlimit;
+		conn->SegMemoryMB = track->QueueInfo->SegResourceQuotaMemoryMB;
+		conn->SegCore 	  = track->QueueInfo->SegResourceQuotaVCore;
 	}
 
-	/* Compute total resource quota. */
-	res = AllocationPolicy[policy] (track, segnum, segnummin, segnumlimit);
-
-	if ( *segnum < *min_segcountfix )
+	/* Decide vseg number and minimum runnable vseg number. */
+	if ( conn->SegNumMin > conn->VSegLimit )
 	{
 		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
-		elog(LOG, " Expect too many virtual segments %d, can not be more "
-		"than %d",
-		*min_segcountfix,
-		*segnum);
+		elog(WARNING, "ConnID %d expects too many virtual segments %d, "
+					  "cannot be more than %d",
+					  conn->ConnID,
+					  conn->SegNumMin,
+					  conn->VSegLimit);
 		return res;
+	}
 
+	if ( conn->SegNum > conn->VSegLimit )
+	{
+		conn->SegNum = conn->VSegLimit;
 	}
 
-	/* Always respect the expected minimum vseg num. */
-	*segnummin = *min_segcountfix;
+	if ( conn->StatNVSeg <= 0 )
+	{
+		/* Compute total resource quota. */
+		res = AllocationPolicy[policy] (track,
+										&(conn->SegNum),
+										&(conn->SegNumMin),
+										conn->VSegLimit);
+
+		/*
+		 * If fixed vseg count range is lower than estimated vseg count range
+		 * based on one allocation policy, we always respect the fixed range.
+		 */
+		if ( conn->SegNum < conn->MinSegCountFixed )
+		{
+			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
+			elog(WARNING, "Expect too many virtual segments %d, cannot be more "
+						  "than %d",
+						  conn->MinSegCountFixed,
+						  conn->SegNum);
+			return res;
+		}
+
+		conn->SegNumMin = conn->MinSegCountFixed;
+		conn->SegNum = conn->SegNum < conn->MaxSegCountFixed ?
+					   conn->SegNum :
+					   conn->MaxSegCountFixed;
+	}
 
 	elog(DEBUG3, "Expect cluster resource (%d MB, %lf CORE) x %d "
 				 "minimum runnable %d segment(s).",
-			     *segmemmb,
-			     *segcore,
-				 *segnum,
-				 *segnummin);
+			     conn->SegMemoryMB,
+			     conn->SegCore,
+				 conn->SegNum,
+				 conn->SegNumMin);
 
 	return FUNC_RETURN_OK;
 }
@@ -3417,7 +3434,7 @@ int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
 {
 	insertDQueueTailNode(&(queuetrack->QueryResRequests), conntrack);
 
-	/* add resource request counter. */
+	/* Add resource request counter. */
 	addResourceBundleData(&(queuetrack->TotalRequest),
 						  conntrack->SegMemoryMB * conntrack->SegNum,
 						  conntrack->SegCore * conntrack->SegNum);
@@ -3854,14 +3871,27 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
 		/* Consider concurrency no more than defined parallel count. */
 		/* TODO: Consider more here... */
 		if ( counter + track->NumOfRunningQueries >= track->QueueInfo->ParallelCount )
+		{
 			break;
+		}
+
+		int equalsegnummin = conntrack->StatNVSeg <= 0 ?
+							 conntrack->SegNumMin :
+							 conntrack->SegNumEqual;
+
 		/* Check if the minimum segment requirement is met. */
-		if ( segmincounter + conntrack->SegNumMin > availsegnum )
+		if ( segmincounter + equalsegnummin > availsegnum )
 		{
 			break;
 		}
-		segcounter    += conntrack->SegNum;
-		segmincounter += conntrack->SegNumMin;
+
+		segcounter += conntrack->StatNVSeg <= 0 ?
+					  conntrack->SegNum :
+					  conntrack->SegNumEqual;
+
+		segmincounter += conntrack->StatNVSeg <= 0 ?
+						 conntrack->SegNumMin :
+						 conntrack->SegNumEqual;
 		counter++;
 	DQUEUE_LOOP_END
 
@@ -3880,7 +3910,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
 		ConnectionTrack conn = removeDQueueHeadNode(&(track->QueryResRequests));
 		conn->SegNumActual = conn->SegNumMin;
 		insertDQueueTailNode(&todisp, conn);
-		availsegnum -= conn->SegNumMin;
+		availsegnum -= conn->StatNVSeg <= 0 ? conn->SegNumMin : conn->SegNumEqual;
 	}
 
 	DQueueNode pnode = getDQueueContainerHead(&todisp);
@@ -3888,7 +3918,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
 	while(availsegnum > 0)
 	{
 		ConnectionTrack conn = (ConnectionTrack)(pnode->Data);
-		if ( conn->SegNum > conn->SegNumActual )
+		if ( conn->StatNVSeg == 0 && conn->SegNum > conn->SegNumActual )
 		{
 			conn->SegNumActual++;
 			availsegnum--;
@@ -3912,7 +3942,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
 		elog(DEBUG3, "Resource manager tries to dispatch resource to connection %d. "
 		   		  	 "Expect (%d MB, %lf CORE) x %d(max %d min %d) segment(s). "
 		   		  	 "Original vseg %d(min %d). "
-		   		  	 "VSeg limit per segment %d VSeg limit per query %d",
+		   		  	 "VSeg limit per segment %d VSeg limit per query %d.",
 					 conn->ConnID,
 				     conn->SegMemoryMB,
 					 conn->SegCore,
@@ -3924,6 +3954,16 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
 					 conn->VSegLimitPerSeg,
 					 conn->VSegLimit);
 
+		if ( conn->StatNVSeg > 0 )
+		{
+			elog(LOG, "Resource manager tries to dispatch resource to connection %d. "
+					  "Statement level resource quota is active. "
+					  "Total %d vsegs, each vseg has %d MB memory quota.",
+					  conn->ConnID,
+					  conn->StatNVSeg,
+					  conn->StatVSegMemoryMB);
+		}
+
 		/* Build resource. */
 		int32_t segnumact = 0;
 		allocateResourceFromResourcePool(conn->SegNumActual,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9c8b6ef..30f8ee0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -85,6 +85,8 @@
 #include "cdb/cdbquerycontextdispatching.h"
 #include "cdb/memquota.h"
 #include "utils/vmem_tracker.h"
+#include "resourcemanager/errorcode.h"
+#include "resourcemanager/utils/simplestring.h"
 
 #ifndef PG_KRB_SRVTAB
 #define PG_KRB_SRVTAB ""
@@ -250,6 +252,8 @@ static const char *assign_password_hash_algorithm(const char *newval,
 												  bool doit, GucSource source);
 static bool	assign_gp_temporary_directory_mark_error(int newval, bool doit, GucSource source);
 
+static const char * assign_hawq_rm_stmt_vseg_memory(const char *newval, bool doit, GucSource source);
+
 /*
  * GUC option variables that are exported from this module
  */
@@ -6422,6 +6426,15 @@ static struct config_int ConfigureNamesInt[] =
     },
 
     {
+            {"hawq_rm_stmt_nvseg", PGC_USERSET, RESOURCES_MGM,
+                    gettext_noop("the size quota of virtual segments for one statement."),
+                    NULL
+            },
+            &rm_stmt_nvseg,
+            0, 0, 65535, NULL, NULL
+    },
+
+    {
             {"hawq_rm_nslice_perseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
                     gettext_noop("the limit of the number of slice number in one segment "
                                              "for one query."),
@@ -8089,6 +8102,15 @@ static struct config_string ConfigureNamesString[] =
 		"", NULL, NULL
 	},
 
+    {
+        {"hawq_rm_stmt_vseg_memory", PGC_USERSET, RESOURCES_MGM,
+            gettext_noop("the memory quota of one virtual segment for one statement."),
+            NULL
+        },
+        &rm_stmt_vseg_mem_str,
+        "128mb", assign_hawq_rm_stmt_vseg_memory, NULL
+    },
+
 	{
 		{"hawq_resourceenforcer_cgroup_mount_point", PGC_POSTMASTER, RESOURCES_MGM,
 			gettext_noop("set cgroup mount point for resource enforcement"),
@@ -13531,4 +13553,37 @@ assign_gp_temporary_directory_mark_error(int newval, bool doit, GucSource source
 	return true;
 }
 
+static const char *
+assign_hawq_rm_stmt_vseg_memory(const char *newval, bool doit, GucSource source)
+{
+	if (doit)
+	{
+		int32_t newvalmb = 0;
+		int parseres = FUNC_RETURN_OK;
+		SimpString valuestr;
+		setSimpleStringRef(&valuestr, newval, strlen(newval));
+		parseres = SimpleStringToStorageSizeMB(&valuestr, &newvalmb);
+
+		if ( parseres != FUNC_RETURN_OK )
+		{
+			return NULL; /* Not valid format of memory quota. */
+		}
+
+		if ( newvalmb == 64   ||
+			 newvalmb == 128  ||
+			 newvalmb == 256  ||
+			 newvalmb == 512  ||
+			 newvalmb == 1024 ||
+			 newvalmb == 2048 ||
+			 newvalmb == 4096 ||
+			 newvalmb == 8192 ||
+			 newvalmb == 16384 )
+		{
+			return newval;
+		}
+		return NULL; /* Not valid quota value. */
+	}
+	return newval;
+}
+
 #include "guc-file.c"

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index a9e0eef..1d4f7c7 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1171,6 +1171,9 @@ extern char   *rm_grm_yarn_queue;
 extern char   *rm_grm_yarn_app_name;
 extern int	   rm_grm_breath_return_percentage;
 
+extern char   *rm_stmt_vseg_mem_str;
+extern int	   rm_stmt_nvseg;
+
 extern int     rm_nvseg_perquery_limit;
 extern int	   rm_nvseg_perquery_perseg_limit;
 extern int	   rm_nslice_perseg_limit;