You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2016/01/05 10:32:27 UTC

incubator-hawq git commit: HAWQ-301. Use combined value to measure segment workload to improve the resource allocation strategy

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 8995b07ff -> 33c72cd1e


HAWQ-301. Use combined value to measure segment workload to improve the resource allocation strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/33c72cd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/33c72cd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/33c72cd1

Branch: refs/heads/master
Commit: 33c72cd1ef61eeb7ffc85da42f47ea7abecbad68
Parents: 8995b07
Author: YI JIN <yj...@pivotal.io>
Authored: Tue Jan 5 20:13:07 2016 +1100
Committer: YI JIN <yj...@pivotal.io>
Committed: Tue Jan 5 20:13:07 2016 +1100

----------------------------------------------------------------------
 src/backend/cdb/cdbvars.c                       |  10 +-
 .../resourcemanager/include/resourcepool.h      |  18 +-
 src/backend/resourcemanager/resourcepool.c      | 574 +++++--------------
 src/backend/utils/misc/guc.c                    |  58 +-
 src/include/cdb/cdbvars.h                       |   8 +
 5 files changed, 229 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 3b3c4be..98c1cf4 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -360,7 +360,7 @@ char	*rm_enforce_cgrp_mnt_pnt;
 char	*rm_enforce_cgrp_hier_name;
 double	rm_enforce_cpu_weight;
 double	rm_enforce_core_vpratio;
-int	rm_enforce_cleanup_period;
+int		rm_enforce_cleanup_period;
 
 int	rm_allocation_policy;
 
@@ -370,6 +370,14 @@ char   *rm_seg_tmp_dirs;
 int     rm_log_level;
 int     rm_nresqueue_limit;
 
+double	rm_regularize_io_max;
+double	rm_regularize_nvseg_max;
+double	rm_regularize_io_factor;
+double	rm_regularize_usage_factor;
+double	rm_regularize_nvseg_factor;
+
+int		rm_nvseg_variance_among_seg_respool_limit;
+
 /* Greenplum Database Experimental Feature GUCs */
 int         gp_distinct_grouping_sets_threshold = 32;
 bool		gp_enable_explain_allstat = FALSE;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index 5bc612f..f9b5081 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -263,6 +263,7 @@ struct SegResourceData {
 
 	int64_t			IOBytesWorkload; /* Accumulated io bytes number.      	  */
 	int				SliceWorkload;	 /* Accumulated slice number.             */
+	int				NVSeg;			 /* Accumulated vseg number.			  */
 
 	uint64_t        LastUpdateTime;  /* Update it when master receives IMAlive
 										message from segment,                 */
@@ -410,7 +411,7 @@ struct ResourcePoolData {
 	/*
 	 * The index to help finding the nodes having fewest io bytes number accumulated.
 	 */
-	BBSTData		OrderedIOBytesWorkload;
+	BBSTData		OrderedCombinedWorkload;
 
 	/*
 	 * This is for caching all resolved hdfs hostnames which are mapped to one
@@ -562,21 +563,6 @@ typedef struct VSegmentCounterInternalData *VSegmentCounterInternal;
 
 void freeVSegmentConterList(List **list);
 
-int allocateResourceFromResourcePoolIOBytes(int32_t 	nodecount,
-										    int32_t		minnodecount,
-										    uint32_t 	memory,
-										    double 		core,
-										    int64_t		iobytes,
-										    int32_t   	slicesize,
-											int32_t		vseglimitpseg,
-										    int 		preferredcount,
-										    char 	  **preferredhostname,
-										    int64_t    *preferredscansize,
-										    bool		fixnodecount,
-										    List 	  **vsegcounters,
-										    int32_t    *totalvsegcount,
-										    int64_t    *vsegiobytes);
-
 int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 										     int32_t	 minnodecount,
 										     uint32_t 	 memory,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index cf8272b..8f1d47e 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -29,10 +29,10 @@
 
 void addSegResourceAvailIndex(SegResource segres);
 void addSegResourceAllocIndex(SegResource segres);
-void addSegResourceIOBytesWorkloadIndex(SegResource segres);
+void addSegResourceCombinedWorkloadIndex(SegResource segres);
 int  reorderSegResourceAvailIndex(SegResource segres, uint32_t ratio);
 int  reorderSegResourceAllocIndex(SegResource segres, uint32_t ratio);
-int  reorderSegResourceIOBytesWorkloadIndex(SegResource segres);
+int  reorderSegResourceCombinedWorkloadIndex(SegResource segres);
 
 int allocateResourceFromSegment(SegResource 	segres,
 							 	GRMContainerSet ctns,
@@ -45,7 +45,8 @@ int recycleResourceToSegment(SegResource	 segres,
 							 int32_t		 memory,
 							 double			 core,
 							 int64_t		 iobytes,
-							 int32_t		 slicesize);
+							 int32_t		 slicesize,
+							 int32_t		 nvseg);
 
 int getSegIDByHostNameInternal(HASHTABLE   hashtable,
 							   const char *hostname,
@@ -71,7 +72,7 @@ void refreshSlavesFileHostSize(FILE *fp);
 /* Functions for BBST indices. */
 int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2);
 int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
-int __DRM_NODERESPOOL_comp_iobytes(void *arg, void *val1, void *val2);
+int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2);
 
 /*
  * The balanced BST index comparing function. The segment containing most
@@ -139,21 +140,55 @@ int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2)
 
 /*
  * The balanced BST index comparing function. The segment containing fewest
- * io bytes workload is ordered at the left most, the segment not in available
+ * combined workload is ordered at the left most, the segment not in available
  * status is treated always the minimum.
  */
-int __DRM_NODERESPOOL_comp_iobytes(void *arg, void *val1, void *val2)
+int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2)
 {
 	SegResource 	node1 	= (SegResource) val1;
 	SegResource 	node2 	= (SegResource) val2;
 
-	int64_t v1 = IS_SEGRESOURCE_USABLE(node1) ? node1->IOBytesWorkload : INT32_MIN;
-	int64_t v2 = IS_SEGRESOURCE_USABLE(node2) ? node2->IOBytesWorkload : INT32_MIN;
+	double v1 = IS_SEGRESOURCE_USABLE(node1) ? 1 : -1;
+	double v2 = IS_SEGRESOURCE_USABLE(node2) ? 1 : -1;
+
+	if ( v1 > 0 )
+	{
+		double fact1 = node1->IOBytesWorkload > rm_regularize_io_max ?
+					   1.0 :
+					   (1.0 * node1->IOBytesWorkload / rm_regularize_io_max);
+		double fact2 = 1.0 -
+					   1.0 * node1->Available.MemoryMB / node1->Allocated.MemoryMB;
+		double fact3 = node1->NVSeg > rm_regularize_nvseg_max ?
+					   1.0 :
+					   1.0 * node1->NVSeg / rm_regularize_nvseg_max;
+
+		v1 = fact1 * rm_regularize_io_factor +
+			 fact2 * rm_regularize_usage_factor +
+			 fact3 * rm_regularize_nvseg_factor;
+	}
+
+	if ( v2 > 0 )
+	{
+		double fact1 = node2->IOBytesWorkload > rm_regularize_io_max ?
+					   1.0 :
+					   (1.0 * node2->IOBytesWorkload / rm_regularize_io_max);
+		double fact2 = 1.0 -
+					   1.0 * node2->Available.MemoryMB / node2->Allocated.MemoryMB;
+		double fact3 = node2->NVSeg > rm_regularize_nvseg_max ?
+					   1.0 :
+					   1.0 * node2->NVSeg / rm_regularize_nvseg_max;
+
+		v2 = fact1 * rm_regularize_io_factor +
+			 fact2 * rm_regularize_usage_factor +
+			 fact3 * rm_regularize_nvseg_factor;
+	}
 
 	/* Expect the minimum one is at the left most. */
 	return v1>v2 ? 1 : ( v1==v2 ? 0 : -1);
 }
 
+
+
 int  getSegInfoHostAddrStr (SegInfo seginfo, int addrindex, AddressString *addr)
 {
 	Assert(seginfo != NULL);
@@ -301,10 +336,10 @@ void initializeResourcePoolManager(void)
     	PRESPOOL->OrderedSegResAllocByRatio[i] = NULL;
     }
 
-	initializeBBST(&(PRESPOOL->OrderedIOBytesWorkload),
+	initializeBBST(&(PRESPOOL->OrderedCombinedWorkload),
 				   PCONTEXT,
 				   NULL,
-				   __DRM_NODERESPOOL_comp_iobytes);
+				   __DRM_NODERESPOOL_comp_combine);
 
 	initializeHASHTABLE(&(PRESPOOL->HDFSHostNameIndexed),
 						PCONTEXT,
@@ -351,8 +386,7 @@ void initializeResourcePoolManager(void)
 	{
 		PRESPOOL->allocateResFuncs[i] = NULL;
 	}
-	PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes;
-	PRESPOOL->allocateResFuncs[1] = allocateResourceFromResourcePoolIOBytes2;
+	PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes2;
 
 
 	PRESPOOL->SlavesFileTimestamp = 0;
@@ -727,7 +761,7 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
 		}
 
 		/* Add this node into the io bytes workload BBST structure. */
-		addSegResourceIOBytesWorkloadIndex(segresource);
+		addSegResourceCombinedWorkloadIndex(segresource);
 		/* Add this node into the alloc/avail resource ordered indices. */
 		addSegResourceAvailIndex(segresource);
 		addSegResourceAllocIndex(segresource);
@@ -1079,6 +1113,7 @@ SegResource createSegResource(SegStat segstat)
 
 	res->IOBytesWorkload = 0;
 	res->SliceWorkload   = 0;
+	res->NVSeg			 = 0;
 	res->Stat     		 = segstat;
 	res->LastUpdateTime  = gettime_microsec();
 	res->Stat->FTSAvailable = RESOURCE_SEG_STATUS_UNSET;
@@ -1775,401 +1810,6 @@ int allocateResourceFromResourcePool(int32_t 	nodecount,
 															   vsegiobytes);
 }
 
-int allocateResourceFromResourcePoolIOBytes(int32_t 	nodecount,
-										    int32_t		minnodecount,
-										    uint32_t 	memory,
-										    double 		core,
-										    int64_t		iobytes,
-										    int32_t   	slicesize,
-											int32_t		vseglimitpseg,
-										    int 		preferredcount,
-										    char 	  **preferredhostname,
-										    int64_t    *preferredscansize,
-										    bool		fixnodecount,
-										    List 	  **vsegcounters,
-										    int32_t    *totalvsegcount,
-										    int64_t    *vsegiobytes)
-{
-	int 			res			  		= FUNC_RETURN_OK;
-	uint32_t 		ratio 		  		= memory/core;
-	BBST			nodetree	  		= &(PRESPOOL->OrderedIOBytesWorkload);
-	BBSTNode		leftnode	  		= NULL;
-	SegResource		segresource   		= NULL;
-	List 		   *tmplist				= NULL;
-	int32_t			segid		  		= SEGSTAT_ID_INVALID;
-	GRMContainerSet containerset  		= NULL;
-	int				nodecountleft 		= nodecount;
-	int				impossiblecount   	= 0;
-	bool			skipchosenmachine 	= true;
-	int 			fullcount 			= nodetree->NodeIndex->NodeCount;
-
-	/* This hash saves all selected hosts containing at least one segment.    */
-	HASHTABLEData	vsegcnttbl;
-
-	initializeHASHTABLE(&vsegcnttbl,
-						PCONTEXT,
-						HASHTABLE_SLOT_VOLUME_DEFAULT,
-						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
-						HASHTABLE_KEYTYPE_UINT32,
-						NULL);
-	/*
-	 *--------------------------------------------------------------------------
-	 * stage 1 allocate based on locality, only 1 segment allocated in one host.
-	 *--------------------------------------------------------------------------
-	 */
-	int clustersize = PRESPOOL->AvailNodeCount;
-	if ( nodecount < clustersize )
-	{
-		elog(DEBUG5, "Resource manager tries to find host based on locality data.");
-
-		for ( uint32_t i = 0 ; i < preferredcount ; ++i )
-		{
-			/*
-			 * Get machine identified by HDFS host name. The HDFS host names does
-			 * not have to be a YARN or HAWQ FTS recognized host name. Therefore,
-			 * getNodeIDByHDFSHostName() is responsible to find one mapped HAWQ
-			 * FTS unified host.
-			 */
-			res = getSegIDByHDFSHostName(preferredhostname[i],
-										 strlen(preferredhostname[i]),
-										 &segid);
-			if ( res != FUNC_RETURN_OK )
-			{
-				/* Can not find the machine, skip this machine. */
-				elog(LOG, "Resource manager failed to resolve HDFS host identified "
-						  "by %s. This host is skipped temporarily.",
-						  preferredhostname[i]);
-				continue;
-			}
-
-			/* Get the resource counter of this host. */
-			segresource = getSegResource(segid);
-
-			if (!IS_SEGRESOURCE_USABLE(segresource))
-			{
-				elog(DEBUG3, "Segment %s has unavailable status:"
-						  "RUAlivePending: %d, Available :%d.",
-						  preferredhostname[i],
-						  segresource->RUAlivePending,
-						  segresource->Stat->FTSAvailable);
-				continue;
-			}
-
-			/* Get the allocated resource of this host with specified ratio */
-			res = getGRMContainerSet(segresource, ratio, &containerset);
-			if ( res != FUNC_RETURN_OK )
-			{
-				/* This machine does not have the resource with matching ratio.*/
-				elog(DEBUG3, "Segment %s does not contain expected "
-						  "resource of %d MB per core. This host is skipped.",
-						  preferredhostname[i],
-						  ratio);
-				continue;
-			}
-
-			/* Decide how many segments can be allocated based on locality data.*/
-			int segcountact = containerset == NULL ?
-							  0 :
-							  containerset->Available.MemoryMB / memory;
-			if ( segcountact == 0 )
-			{
-				elog(DEBUG3, "Segment %s does not have more resource to allocate. "
-							 "This segment is skipped.",
-							 preferredhostname[i]);
-				continue;
-			}
-
-			/*
-			 * We expect only 1 segment working in this preferred host. Therefore,
-			 * we check one virtual segment containing slicesize slices.
-			 *
-			 * NOTE: In this loop we always try to find segments not breaking
-			 * 		 slice limit. Because we will gothrough all segments later
-			 * 		 if not enough segments are found in this loop.
-			 */
-			if ( segresource->SliceWorkload + slicesize > rm_nslice_perseg_limit )
-			{
-				elog(DEBUG3, "Segment %s contains %d slices working now, it can "
-							 "not afford %d more slices.",
-							 preferredhostname[i],
-							 segresource->SliceWorkload,
-							 slicesize);
-				continue;
-			}
-
-			elog(DEBUG3, "Resource manager chooses segment %s to allocate vseg.",
-						 GET_SEGRESOURCE_HOSTNAME(segresource));
-
-			/* Allocate resource from selected host. */
-			allocateResourceFromSegment(segresource,
-									 	containerset,
-										memory,
-										core,
-										slicesize);
-
-			/* Reorder the changed host. */
-			reorderSegResourceAvailIndex(segresource, ratio);
-
-			/* Track the mapping from host information to hdfs host name index.*/
-			VSegmentCounterInternal vsegcnt = createVSegmentCounter(i, segresource);
-
-			setHASHTABLENode(&vsegcnttbl,
-							 TYPCONVERT(void *, segresource->Stat->ID),
-							 TYPCONVERT(void *, vsegcnt),
-							 false);
-
-			/* Check if we have gotten expected number of segments. */
-			nodecountleft--;
-			if ( nodecountleft == 0 )
-			{
-				break;
-			}
-		}
-	}
-
-	elog(DEBUG3, "After choosing vseg based on locality, %d vsegs allocated, "
-				 "expect %d vsegs.",
-				 nodecount-nodecountleft,
-				 nodecount);
-
-	/*
-	 *--------------------------------------------------------------------------
-	 * stage 2 allocate based on io workload.
-	 *--------------------------------------------------------------------------
-	 */
-	while( nodecountleft > 0 &&
-		   PRESPOOL->OrderedIOBytesWorkload.Root != NULL &&
-		   impossiblecount < fullcount )
-	{
-		VSegmentCounterInternal curhost     = NULL;
-		bool 				    skipcurrent = false;
-
-		/* Get and remove the host having largest available resource. */
-		leftnode = getLeftMostNode(nodetree);
-		removeBBSTNode(nodetree, &leftnode);
-		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-		tmplist= lappend(tmplist, leftnode);
-		MEMORY_CONTEXT_SWITCH_BACK
-
-		/* If in current loop we should skip chosen machines, check and skip. */
-		SegResource currresinfo = (SegResource)(leftnode->Data);
-
-		elog(DEBUG5, "Try segment %s to allocate resource by round-robin.",
-					 GET_SEGRESOURCE_HOSTNAME(currresinfo));
-
-		if ( !IS_SEGRESOURCE_USABLE(currresinfo) )
-		{
-			impossiblecount++;
-			skipcurrent = true;
-			elog(DEBUG5, "Segment %s is not resource usable, status %d pending %d",
-						 GET_SEGRESOURCE_HOSTNAME(currresinfo),
-						 currresinfo->Stat->FTSAvailable,
-						 currresinfo->RUAlivePending?1:0);
-		}
-		else
-		{
-			PAIR pair = getHASHTABLENode(&vsegcnttbl,
-										 TYPCONVERT(void *,
-													currresinfo->Stat->ID));
-			if ( pair != NULL )
-			{
-				Assert(!currresinfo->RUAlivePending);
-				Assert(IS_SEGSTAT_FTSAVAILABLE(currresinfo->Stat));
-
-				curhost = (VSegmentCounterInternal)(pair->Value);
-				/* Host should not break vseg num limit. */
-				if ( !fixnodecount && curhost->VSegmentCount >= vseglimitpseg )
-				{
-					impossiblecount++;
-					skipcurrent = true;
-					elog(DEBUG5, "Segment %s can not container more vsegs for "
-								 "current statement, allocated %d vsegs.",
-								 GET_SEGRESOURCE_HOSTNAME(curhost->Resource),
-								 curhost->VSegmentCount);
-				}
-
-				if ( !skipcurrent && skipchosenmachine )
-				{
-					impossiblecount++;
-					skipcurrent = true;
-					elog(DEBUG5, "Segment %s is skipped temporarily.",
-								 GET_SEGRESOURCE_HOSTNAME(curhost->Resource));
-				}
-			}
-		}
-
-		if ( !skipcurrent )
-		{
-			/* Try to allocate resource in the selected host. */
-			SegResource curres = (SegResource)(leftnode->Data);
-
-			res = getGRMContainerSet(curres, ratio, &containerset);
-
-			if ( res != FUNC_RETURN_OK )
-			{
-				/* This machine does not have the resource with matching ratio.
-				 * In fact should never occur. */
-				impossiblecount++;
-				elog(DEBUG5, "Segment %s does not contain resource of %d MBPCORE",
-							 GET_SEGRESOURCE_HOSTNAME(curres),
-							 ratio);
-			}
-			else
-			{
-
-				if ( curres->SliceWorkload + slicesize > rm_nslice_perseg_limit )
-				{
-					elog(LOG, "Segment %s contains %d slices working now, "
-							  "it can not afford %d more slices.",
-							  GET_SEGRESOURCE_HOSTNAME(curres),
-							  curres->SliceWorkload,
-							  slicesize);
-					impossiblecount++;
-				}
-
-				else if ( containerset != NULL &&
-						  containerset->Available.MemoryMB >= memory &&
-					      containerset->Available.Core     >= core )
-				{
-					elog(DEBUG3, "Resource manager chooses host %s to allocate vseg.",
-								 GET_SEGRESOURCE_HOSTNAME(curres));
-
-					/* Allocate resource. */
-					allocateResourceFromSegment(curres,
-											 	containerset,
-												memory,
-												core,
-												slicesize);
-					/* Reorder the changed host. */
-					reorderSegResourceAvailIndex(curres, ratio);
-
-					/*
-					 * Check if the selected host has hdfs host name passed in.
-					 * If true we just simply add the counter, otherwise, we
-					 * create a new segment counter instance.
-					 */
-					if ( curhost != NULL )
-					{
-						curhost->VSegmentCount++;
-					}
-					else
-					{
-						uint32_t hdfsnameindex = preferredcount;
-						int32_t  syncid		   = SEGSTAT_ID_INVALID;
-
-						for ( uint32_t k = 0 ; k < preferredcount ; ++k )
-						{
-							res=getSegIDByHDFSHostName(preferredhostname[k],
-									  	  	  	  	   strlen(preferredhostname[k]),
-													   &syncid);
-							if(syncid == curres->Stat->ID)
-							{
-								hdfsnameindex = k;
-								break;
-							}
-						}
-
-						VSegmentCounterInternal vsegcnt =
-							createVSegmentCounter(hdfsnameindex, curres);
-
-						if (hdfsnameindex == preferredcount)
-						{
-							if (debug_print_split_alloc_result)
-							{
-								elog(LOG, "Segment %s mismatched HDFS host name.",
-										  GET_SEGRESOURCE_HOSTNAME(vsegcnt->Resource));
-							}
-						}
-
-						setHASHTABLENode(&vsegcnttbl,
-										 TYPCONVERT(void *, curres->Stat->ID),
-										 TYPCONVERT(void *, vsegcnt),
-										 false);
-					}
-					nodecountleft--;
-					impossiblecount = 0;
-				}
-				else
-				{
-					elog(DEBUG5, "Segment %s does not contain enough resource of "
-								 "%d MBPCORE",
-								 GET_SEGRESOURCE_HOSTNAME(curres),
-								 ratio);
-					impossiblecount++;
-				}
-			}
-		}
-
-		if ( impossiblecount >= fullcount )
-		{
-			if ( skipchosenmachine )
-			{
-				impossiblecount = 0;
-			}
-			skipchosenmachine = false;
-		}
-
- 		/*
- 		 * If the tree goes to 0 nodes, we have to insert all nodes saved in
- 		 * tmplist back to the tree to make them ordered naturally again.
- 		 */
-		if ( nodetree->Root == NULL )
-		{
-			while( list_length(tmplist) > 0 )
-			{
-				MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-				insertBBSTNode(nodetree, (BBSTNode)(lfirst(list_head(tmplist))));
-				tmplist = list_delete_first(tmplist);
-				MEMORY_CONTEXT_SWITCH_BACK
-			}
-		}
-	}
-
-	/*
-	 * Insert all nodes saved in tmplist back to the tree to restore the resource
-	 * tree for the next time.
-	 */
-	while( list_length(tmplist) > 0 )
-	{
-		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-		insertBBSTNode(nodetree, (BBSTNode)(lfirst(list_head(tmplist))));
-		tmplist = list_delete_first(tmplist);
-		MEMORY_CONTEXT_SWITCH_BACK
-	}
-
-	/* STEP 3. Refresh io bytes workload. */
-	*vsegiobytes = (nodecount - nodecountleft) > 0 ?
-					iobytes / (nodecount - nodecountleft) :
-					0;
-
-	List 	 *vsegcntlist = NULL;
-	ListCell *cell		  = NULL;
-	getAllPAIRRefIntoList(&vsegcnttbl, &vsegcntlist);
-	foreach(cell, vsegcntlist)
-	{
-		VSegmentCounterInternal vsegcounter = (VSegmentCounterInternal)
-											  ((PAIR)(lfirst(cell)))->Value;
-		vsegcounter->Resource->IOBytesWorkload +=
-									(*vsegiobytes) * vsegcounter->VSegmentCount;
-		reorderSegResourceIOBytesWorkloadIndex(vsegcounter->Resource);
-	}
-
-    /* STEP 4. Build result. */
-	foreach(cell, vsegcntlist)
-	{
-		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-			(*vsegcounters) = lappend((*vsegcounters),
-									  ((PAIR)(lfirst(cell)))->Value);
-		MEMORY_CONTEXT_SWITCH_BACK
-	}
-	freePAIRRefList(&vsegcnttbl, &vsegcntlist);
-	cleanHASHTABLE(&vsegcnttbl);
-	*totalvsegcount = nodecount - nodecountleft;
-
-	validateResourcePoolStatus(false);
-	return FUNC_RETURN_OK;
-}
-
 int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 										     int32_t	 minnodecount,
 										     uint32_t 	 memory,
@@ -2187,7 +1827,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 {
 	int 			res			  		= FUNC_RETURN_OK;
 	uint32_t 		ratio 		  		= memory/core;
-	BBST			nodetree	  		= &(PRESPOOL->OrderedIOBytesWorkload);
+	BBST			nodetree	  		= &(PRESPOOL->OrderedCombinedWorkload);
 	BBSTNode		leftnode	  		= NULL;
 	SegResource		segresource   		= NULL;
 	List 		   *tmplist				= NULL;
@@ -2318,6 +1958,94 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 		}
 	}
 
+	/*--------------------------------------------------------------------------
+	 * Check if the nvseg variance limit is broken. We check this only when there
+	 * are some virtual segments allocated based on passed in data locality
+	 * reference, if the limit is broken, the virtual segments already allocated
+	 * are returned.
+	 *--------------------------------------------------------------------------
+	 */
+	if ( nodecountleft != nodecount )
+	{
+		int minnvseg = INT32_MAX;
+		int maxnvseg = 0;
+
+		/* Go through all segments. */
+		List 	 *ressegl	 = NULL;
+		ListCell *cell		 = NULL;
+		getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl);
+
+		foreach(cell, ressegl)
+		{
+			PAIR pair = (PAIR)lfirst(cell);
+			SegResource segres = (SegResource)(pair->Value);
+			int nvseg = segres->NVSeg;
+
+			/*
+			 * If current nvseg counter list has this host referenced, we should
+			 * add the additional 1.
+			 */
+			PAIR pair2 = getHASHTABLENode(&vsegcnttbl,
+										  TYPCONVERT(void *,
+													 segres->Stat->ID));
+			nvseg = pair2 == NULL ? nvseg : nvseg + 1;
+
+			minnvseg = minnvseg < nvseg ? minnvseg : nvseg;
+			maxnvseg = maxnvseg > nvseg ? maxnvseg : nvseg;
+		}
+
+		freePAIRRefList(&(PRESPOOL->Segments), &ressegl);
+		Assert(minnvseg <= maxnvseg);
+
+		if ( maxnvseg - minnvseg > rm_nvseg_variance_among_seg_respool_limit )
+		{
+			elog(LOG, "Reject virtual segment allocation based on data "
+					  "locality information. After tentative allocation "
+					  "maximum number of virtual segments in one segment is "
+					  "%d minimum number of virtual segments in one segment "
+					  "is %d, tolerated difference limit is %d.",
+					  maxnvseg,
+					  minnvseg,
+					  rm_nvseg_variance_among_seg_respool_limit);
+
+			/* Return the allocated resource. */
+			List 	 *vsegcntlist = NULL;
+			ListCell *cell		  = NULL;
+			getAllPAIRRefIntoList(&vsegcnttbl, &vsegcntlist);
+			foreach(cell, vsegcntlist)
+			{
+				VSegmentCounterInternal vsegcounter = (VSegmentCounterInternal)
+													  ((PAIR)(lfirst(cell)))->Value;
+				GRMContainerSet ctns = NULL;
+				int res2 = getGRMContainerSet(vsegcounter->Resource, ratio, &ctns);
+				Assert(res2 == FUNC_RETURN_OK);
+
+				res2 = recycleResourceToSegment(vsegcounter->Resource,
+										 	 	ctns,
+												memory,
+												core,
+												0,
+												slicesize,
+												1);
+				Assert(res2 == FUNC_RETURN_OK);
+
+				/* Free the counter instance. */
+				rm_pfree(PCONTEXT, vsegcounter);
+
+				/* Reorder the changed host. */
+				reorderSegResourceAvailIndex(segresource, ratio);
+			}
+			freePAIRRefList(&vsegcnttbl, &vsegcntlist);
+
+			/* Clear the content in the virtual segment counter hashtable. */
+			clearHASHTABLE(&vsegcnttbl);
+
+			/* Restore nodecount. */
+			nodecountleft = nodecount;
+		}
+	}
+
+
 	elog(RMLOG, "After choosing vseg based on locality, %d vsegs allocated, "
 				"expect %d vsegs.",
 				nodecount-nodecountleft,
@@ -2325,7 +2053,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 
 	/*
 	 *--------------------------------------------------------------------------
-	 * stage 2 allocate based on io workload.
+	 * stage 2 allocate based on combined workload.
 	 *--------------------------------------------------------------------------
 	 */
 
@@ -2519,7 +2247,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 											  ((PAIR)(lfirst(cell)))->Value;
 		vsegcounter->Resource->IOBytesWorkload +=
 									(*vsegiobytes) * vsegcounter->VSegmentCount;
-		reorderSegResourceIOBytesWorkloadIndex(vsegcounter->Resource);
+		reorderSegResourceCombinedWorkloadIndex(vsegcounter->Resource);
 	}
 
     /* STEP 4. Build result. */
@@ -2574,11 +2302,12 @@ int returnResourceToResourcePool(int 		memory,
 										   memory      * vsegcnt->VSegmentCount,
 										   core        * vsegcnt->VSegmentCount,
 										   vsegiobytes * vsegcnt->VSegmentCount,
-										   slicesize   * vsegcnt->VSegmentCount);
+										   slicesize   * vsegcnt->VSegmentCount,
+										   vsegcnt->VSegmentCount);
 
 			res = reorderSegResourceAvailIndex(segres, ratio);
 			Assert(res == FUNC_RETURN_OK);
-			res = reorderSegResourceIOBytesWorkloadIndex(segres);
+			res = reorderSegResourceCombinedWorkloadIndex(segres);
 			Assert(res == FUNC_RETURN_OK);
 		}
 		else
@@ -2636,6 +2365,7 @@ int allocateResourceFromSegment(SegResource 	segres,
 	minusResourceBundleData(&(segres->Available), memory, core);
 
 	segres->SliceWorkload	+= slicesize;
+	segres->NVSeg			+= 1;
 
 	elog(DEBUG3, "HAWQ RM :: allocated resource from machine %s by "
 				 "(%d MB, %lf CORE) for %d slices. "
@@ -2658,10 +2388,12 @@ int recycleResourceToSegment(SegResource	 segres,
 							 int32_t		 memory,
 							 double			 core,
 							 int64_t		 iobytes,
-							 int32_t		 slicesize)
+							 int32_t		 slicesize,
+							 int32_t		 nvseg)
 {
 	segres->IOBytesWorkload -= iobytes;
 	segres->SliceWorkload   -= slicesize;
+	segres->NVSeg			-= nvseg;
 
 	if ( ctns != NULL )
 	{
@@ -2753,11 +2485,11 @@ void addSegResourceAllocIndex(SegResource segres)
 	}
 }
 
-void addSegResourceIOBytesWorkloadIndex(SegResource segres)
+void addSegResourceCombinedWorkloadIndex(SegResource segres)
 {
 	/* Add the node */
-	int res = insertBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload),
-						 	 createBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload),
+	int res = insertBBSTNode(&(PRESPOOL->OrderedCombinedWorkload),
+						 	 createBBSTNode(&(PRESPOOL->OrderedCombinedWorkload),
 						 			 	 	segres));
 	if (res != FUNC_RETURN_OK)
 	{
@@ -2796,24 +2528,24 @@ int reorderSegResourceAllocIndex(SegResource segres, uint32_t ratio)
 	return reorderBBSTNodeData(tree, segres);
 }
 
-int reorderSegResourceIOBytesWorkloadIndex(SegResource segres)
+int reorderSegResourceCombinedWorkloadIndex(SegResource segres)
 {
 	int 	 res 	= FUNC_RETURN_OK;
 	BBSTNode node	= NULL;
 
 	/* Reorder the node */
-	node = getBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), segres);
+	node = getBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), segres);
 	if ( node == NULL )
 	{
 		return RESOURCEPOOL_INTERNAL_NO_HOST_INDEX;
 	}
 
-	res = removeBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), &node);
+	res = removeBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), &node);
 	if ( res != FUNC_RETURN_OK )
 	{
 		return RESOURCEPOOL_INTERNAL_NO_HOST_INDEX;
 	}
-	res = insertBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), node);
+	res = insertBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), node);
 
 	if ( res == UTIL_BBST_DUPLICATE_VALUE )
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 918920b..becc9c9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -6339,7 +6339,7 @@ static struct config_int ConfigureNamesInt[] =
 			GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
 		},
 		&rm_allocation_policy,
-		1, 0, 10, NULL, NULL
+		0, 0, 0, NULL, NULL
 	},
 
     {
@@ -6479,6 +6479,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"hawq_rm_nvseg_variance_amon_seg_respool_limit", PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("the variance of vseg number in each segment that resource manager "
+						 "should tolerate at most in resource pool when choosing segments "
+						 "based on data locality reference."),
+			NULL
+		},
+		&rm_nvseg_variance_among_seg_respool_limit,
+		2, 0, 65535, NULL, NULL
+	},
+
+	{
 		{"hawq_rm_container_batch_limit", PGC_POSTMASTER, RESOURCES_MGM,
 			gettext_noop("the batch process limit for global resource manager containers."),
 			NULL
@@ -6872,6 +6883,51 @@ static struct config_real ConfigureNamesReal[] =
 	},
 
 	{
+		{"hawq_rm_regularize_io_max",PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("Set the maximum io workload limit for regularize the workload of one segment."),
+			NULL
+		},
+		&rm_regularize_io_max,
+		137438953472.0 /* 128gb */, 0.0, DBL_MAX, NULL, NULL
+	},
+
+	{
+		{"hawq_rm_regularize_nvseg_max",PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("Set the maximum number of virtual segments for regularize the workload of one segment."),
+			NULL
+		},
+		&rm_regularize_nvseg_max,
+		300.0, 0.0, DBL_MAX, NULL, NULL
+	},
+
+	{
+		{"hawq_rm_regularize_io_factor",PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("Set the factor of io workload in combined workload of a segment."),
+			NULL
+		},
+		&rm_regularize_io_factor,
+		1.0, 0.0, DBL_MAX, NULL, NULL
+	},
+
+	{
+		{"hawq_rm_regularize_usage_factor",PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("Set the factor of resource usage in combined workload of a segment."),
+			NULL
+		},
+		&rm_regularize_usage_factor,
+		1.0, 0.0, DBL_MAX, NULL, NULL
+	},
+
+	{
+		{"hawq_rm_regularize_nvseg_factor",PGC_POSTMASTER, RESOURCES_MGM,
+			gettext_noop("Set the factor of number of virtual segments in combined workload of a segment."),
+			NULL
+		},
+		&rm_regularize_nvseg_factor,
+		1.0, 0.0, DBL_MAX, NULL, NULL
+	},
+
+	{
 		{"optimizer_nestloop_factor",PGC_USERSET, DEVELOPER_OPTIONS,
 			gettext_noop("Sets the nestloop join cost factor in the optimizer"),
 			NULL,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index 8456337..bb59d76 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1217,6 +1217,14 @@ extern char   *rm_seg_tmp_dirs;
 extern int     rm_log_level;
 extern int     rm_nresqueue_limit;
 
+extern double  rm_regularize_io_max;
+extern double  rm_regularize_nvseg_max;
+extern double  rm_regularize_io_factor;
+extern double  rm_regularize_usage_factor;
+extern double  rm_regularize_nvseg_factor;
+
+extern int	   rm_nvseg_variance_among_seg_respool_limit;
+
 extern int max_filecount_notto_split_segment;
 extern int min_datasize_to_combine_segment;
 extern int datalocality_algorithm_version;