You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2016/01/05 10:32:27 UTC
incubator-hawq git commit: HAWQ-301. Use combined value to measure
segment workload to improve the resource allocation strategy
Repository: incubator-hawq
Updated Branches:
refs/heads/master 8995b07ff -> 33c72cd1e
HAWQ-301. Use combined value to measure segment workload to improve the resource allocation strategy
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/33c72cd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/33c72cd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/33c72cd1
Branch: refs/heads/master
Commit: 33c72cd1ef61eeb7ffc85da42f47ea7abecbad68
Parents: 8995b07
Author: YI JIN <yj...@pivotal.io>
Authored: Tue Jan 5 20:13:07 2016 +1100
Committer: YI JIN <yj...@pivotal.io>
Committed: Tue Jan 5 20:13:07 2016 +1100
----------------------------------------------------------------------
src/backend/cdb/cdbvars.c | 10 +-
.../resourcemanager/include/resourcepool.h | 18 +-
src/backend/resourcemanager/resourcepool.c | 574 +++++--------------
src/backend/utils/misc/guc.c | 58 +-
src/include/cdb/cdbvars.h | 8 +
5 files changed, 229 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 3b3c4be..98c1cf4 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -360,7 +360,7 @@ char *rm_enforce_cgrp_mnt_pnt;
char *rm_enforce_cgrp_hier_name;
double rm_enforce_cpu_weight;
double rm_enforce_core_vpratio;
-int rm_enforce_cleanup_period;
+int rm_enforce_cleanup_period;
int rm_allocation_policy;
@@ -370,6 +370,14 @@ char *rm_seg_tmp_dirs;
int rm_log_level;
int rm_nresqueue_limit;
+double rm_regularize_io_max;
+double rm_regularize_nvseg_max;
+double rm_regularize_io_factor;
+double rm_regularize_usage_factor;
+double rm_regularize_nvseg_factor;
+
+int rm_nvseg_variance_among_seg_respool_limit;
+
/* Greenplum Database Experimental Feature GUCs */
int gp_distinct_grouping_sets_threshold = 32;
bool gp_enable_explain_allstat = FALSE;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index 5bc612f..f9b5081 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -263,6 +263,7 @@ struct SegResourceData {
int64_t IOBytesWorkload; /* Accumulated io bytes number. */
int SliceWorkload; /* Accumulated slice number. */
+ int NVSeg; /* Accumulated vseg number. */
uint64_t LastUpdateTime; /* Update it when master receives IMAlive
message from segment, */
@@ -410,7 +411,7 @@ struct ResourcePoolData {
/*
* The index to help finding the nodes having fewest io bytes number accumulated.
*/
- BBSTData OrderedIOBytesWorkload;
+ BBSTData OrderedCombinedWorkload;
/*
* This is for caching all resolved hdfs hostnames which are mapped to one
@@ -562,21 +563,6 @@ typedef struct VSegmentCounterInternalData *VSegmentCounterInternal;
void freeVSegmentConterList(List **list);
-int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
- int32_t minnodecount,
- uint32_t memory,
- double core,
- int64_t iobytes,
- int32_t slicesize,
- int32_t vseglimitpseg,
- int preferredcount,
- char **preferredhostname,
- int64_t *preferredscansize,
- bool fixnodecount,
- List **vsegcounters,
- int32_t *totalvsegcount,
- int64_t *vsegiobytes);
-
int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
int32_t minnodecount,
uint32_t memory,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index cf8272b..8f1d47e 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -29,10 +29,10 @@
void addSegResourceAvailIndex(SegResource segres);
void addSegResourceAllocIndex(SegResource segres);
-void addSegResourceIOBytesWorkloadIndex(SegResource segres);
+void addSegResourceCombinedWorkloadIndex(SegResource segres);
int reorderSegResourceAvailIndex(SegResource segres, uint32_t ratio);
int reorderSegResourceAllocIndex(SegResource segres, uint32_t ratio);
-int reorderSegResourceIOBytesWorkloadIndex(SegResource segres);
+int reorderSegResourceCombinedWorkloadIndex(SegResource segres);
int allocateResourceFromSegment(SegResource segres,
GRMContainerSet ctns,
@@ -45,7 +45,8 @@ int recycleResourceToSegment(SegResource segres,
int32_t memory,
double core,
int64_t iobytes,
- int32_t slicesize);
+ int32_t slicesize,
+ int32_t nvseg);
int getSegIDByHostNameInternal(HASHTABLE hashtable,
const char *hostname,
@@ -71,7 +72,7 @@ void refreshSlavesFileHostSize(FILE *fp);
/* Functions for BBST indices. */
int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2);
int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
-int __DRM_NODERESPOOL_comp_iobytes(void *arg, void *val1, void *val2);
+int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2);
/*
* The balanced BST index comparing function. The segment containing most
@@ -139,21 +140,55 @@ int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2)
/*
* The balanced BST index comparing function. The segment containing fewest
- * io bytes workload is ordered at the left most, the segment not in available
+ * combined workload is ordered at the left most, the segment not in available
* status is treated always the minimum.
*/
-int __DRM_NODERESPOOL_comp_iobytes(void *arg, void *val1, void *val2)
+int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2)
{
SegResource node1 = (SegResource) val1;
SegResource node2 = (SegResource) val2;
- int64_t v1 = IS_SEGRESOURCE_USABLE(node1) ? node1->IOBytesWorkload : INT32_MIN;
- int64_t v2 = IS_SEGRESOURCE_USABLE(node2) ? node2->IOBytesWorkload : INT32_MIN;
+ double v1 = IS_SEGRESOURCE_USABLE(node1) ? 1 : -1;
+ double v2 = IS_SEGRESOURCE_USABLE(node2) ? 1 : -1;
+
+ if ( v1 > 0 )
+ {
+ double fact1 = node1->IOBytesWorkload > rm_regularize_io_max ?
+ 1.0 :
+ (1.0 * node1->IOBytesWorkload / rm_regularize_io_max);
+ double fact2 = 1.0 -
+ 1.0 * node1->Available.MemoryMB / node1->Allocated.MemoryMB;
+ double fact3 = node1->NVSeg > rm_regularize_nvseg_max ?
+ 1.0 :
+ 1.0 * node1->NVSeg / rm_regularize_nvseg_max;
+
+ v1 = fact1 * rm_regularize_io_factor +
+ fact2 * rm_regularize_usage_factor +
+ fact3 * rm_regularize_nvseg_factor;
+ }
+
+ if ( v2 > 0 )
+ {
+ double fact1 = node2->IOBytesWorkload > rm_regularize_io_max ?
+ 1.0 :
+ (1.0 * node2->IOBytesWorkload / rm_regularize_io_max);
+ double fact2 = 1.0 -
+ 1.0 * node2->Available.MemoryMB / node2->Allocated.MemoryMB;
+ double fact3 = node2->NVSeg > rm_regularize_nvseg_max ?
+ 1.0 :
+ 1.0 * node2->NVSeg / rm_regularize_nvseg_max;
+
+ v2 = fact1 * rm_regularize_io_factor +
+ fact2 * rm_regularize_usage_factor +
+ fact3 * rm_regularize_nvseg_factor;
+ }
/* Expect the minimum one is at the left most. */
return v1>v2 ? 1 : ( v1==v2 ? 0 : -1);
}
+
+
int getSegInfoHostAddrStr (SegInfo seginfo, int addrindex, AddressString *addr)
{
Assert(seginfo != NULL);
@@ -301,10 +336,10 @@ void initializeResourcePoolManager(void)
PRESPOOL->OrderedSegResAllocByRatio[i] = NULL;
}
- initializeBBST(&(PRESPOOL->OrderedIOBytesWorkload),
+ initializeBBST(&(PRESPOOL->OrderedCombinedWorkload),
PCONTEXT,
NULL,
- __DRM_NODERESPOOL_comp_iobytes);
+ __DRM_NODERESPOOL_comp_combine);
initializeHASHTABLE(&(PRESPOOL->HDFSHostNameIndexed),
PCONTEXT,
@@ -351,8 +386,7 @@ void initializeResourcePoolManager(void)
{
PRESPOOL->allocateResFuncs[i] = NULL;
}
- PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes;
- PRESPOOL->allocateResFuncs[1] = allocateResourceFromResourcePoolIOBytes2;
+ PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes2;
PRESPOOL->SlavesFileTimestamp = 0;
@@ -727,7 +761,7 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
}
/* Add this node into the io bytes workload BBST structure. */
- addSegResourceIOBytesWorkloadIndex(segresource);
+ addSegResourceCombinedWorkloadIndex(segresource);
/* Add this node into the alloc/avail resource ordered indices. */
addSegResourceAvailIndex(segresource);
addSegResourceAllocIndex(segresource);
@@ -1079,6 +1113,7 @@ SegResource createSegResource(SegStat segstat)
res->IOBytesWorkload = 0;
res->SliceWorkload = 0;
+ res->NVSeg = 0;
res->Stat = segstat;
res->LastUpdateTime = gettime_microsec();
res->Stat->FTSAvailable = RESOURCE_SEG_STATUS_UNSET;
@@ -1775,401 +1810,6 @@ int allocateResourceFromResourcePool(int32_t nodecount,
vsegiobytes);
}
-int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
- int32_t minnodecount,
- uint32_t memory,
- double core,
- int64_t iobytes,
- int32_t slicesize,
- int32_t vseglimitpseg,
- int preferredcount,
- char **preferredhostname,
- int64_t *preferredscansize,
- bool fixnodecount,
- List **vsegcounters,
- int32_t *totalvsegcount,
- int64_t *vsegiobytes)
-{
- int res = FUNC_RETURN_OK;
- uint32_t ratio = memory/core;
- BBST nodetree = &(PRESPOOL->OrderedIOBytesWorkload);
- BBSTNode leftnode = NULL;
- SegResource segresource = NULL;
- List *tmplist = NULL;
- int32_t segid = SEGSTAT_ID_INVALID;
- GRMContainerSet containerset = NULL;
- int nodecountleft = nodecount;
- int impossiblecount = 0;
- bool skipchosenmachine = true;
- int fullcount = nodetree->NodeIndex->NodeCount;
-
- /* This hash saves all selected hosts containing at least one segment. */
- HASHTABLEData vsegcnttbl;
-
- initializeHASHTABLE(&vsegcnttbl,
- PCONTEXT,
- HASHTABLE_SLOT_VOLUME_DEFAULT,
- HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
- HASHTABLE_KEYTYPE_UINT32,
- NULL);
- /*
- *--------------------------------------------------------------------------
- * stage 1 allocate based on locality, only 1 segment allocated in one host.
- *--------------------------------------------------------------------------
- */
- int clustersize = PRESPOOL->AvailNodeCount;
- if ( nodecount < clustersize )
- {
- elog(DEBUG5, "Resource manager tries to find host based on locality data.");
-
- for ( uint32_t i = 0 ; i < preferredcount ; ++i )
- {
- /*
- * Get machine identified by HDFS host name. The HDFS host names does
- * not have to be a YARN or HAWQ FTS recognized host name. Therefore,
- * getNodeIDByHDFSHostName() is responsible to find one mapped HAWQ
- * FTS unified host.
- */
- res = getSegIDByHDFSHostName(preferredhostname[i],
- strlen(preferredhostname[i]),
- &segid);
- if ( res != FUNC_RETURN_OK )
- {
- /* Can not find the machine, skip this machine. */
- elog(LOG, "Resource manager failed to resolve HDFS host identified "
- "by %s. This host is skipped temporarily.",
- preferredhostname[i]);
- continue;
- }
-
- /* Get the resource counter of this host. */
- segresource = getSegResource(segid);
-
- if (!IS_SEGRESOURCE_USABLE(segresource))
- {
- elog(DEBUG3, "Segment %s has unavailable status:"
- "RUAlivePending: %d, Available :%d.",
- preferredhostname[i],
- segresource->RUAlivePending,
- segresource->Stat->FTSAvailable);
- continue;
- }
-
- /* Get the allocated resource of this host with specified ratio */
- res = getGRMContainerSet(segresource, ratio, &containerset);
- if ( res != FUNC_RETURN_OK )
- {
- /* This machine does not have the resource with matching ratio.*/
- elog(DEBUG3, "Segment %s does not contain expected "
- "resource of %d MB per core. This host is skipped.",
- preferredhostname[i],
- ratio);
- continue;
- }
-
- /* Decide how many segments can be allocated based on locality data.*/
- int segcountact = containerset == NULL ?
- 0 :
- containerset->Available.MemoryMB / memory;
- if ( segcountact == 0 )
- {
- elog(DEBUG3, "Segment %s does not have more resource to allocate. "
- "This segment is skipped.",
- preferredhostname[i]);
- continue;
- }
-
- /*
- * We expect only 1 segment working in this preferred host. Therefore,
- * we check one virtual segment containing slicesize slices.
- *
- * NOTE: In this loop we always try to find segments not breaking
- * slice limit. Because we will gothrough all segments later
- * if not enough segments are found in this loop.
- */
- if ( segresource->SliceWorkload + slicesize > rm_nslice_perseg_limit )
- {
- elog(DEBUG3, "Segment %s contains %d slices working now, it can "
- "not afford %d more slices.",
- preferredhostname[i],
- segresource->SliceWorkload,
- slicesize);
- continue;
- }
-
- elog(DEBUG3, "Resource manager chooses segment %s to allocate vseg.",
- GET_SEGRESOURCE_HOSTNAME(segresource));
-
- /* Allocate resource from selected host. */
- allocateResourceFromSegment(segresource,
- containerset,
- memory,
- core,
- slicesize);
-
- /* Reorder the changed host. */
- reorderSegResourceAvailIndex(segresource, ratio);
-
- /* Track the mapping from host information to hdfs host name index.*/
- VSegmentCounterInternal vsegcnt = createVSegmentCounter(i, segresource);
-
- setHASHTABLENode(&vsegcnttbl,
- TYPCONVERT(void *, segresource->Stat->ID),
- TYPCONVERT(void *, vsegcnt),
- false);
-
- /* Check if we have gotten expected number of segments. */
- nodecountleft--;
- if ( nodecountleft == 0 )
- {
- break;
- }
- }
- }
-
- elog(DEBUG3, "After choosing vseg based on locality, %d vsegs allocated, "
- "expect %d vsegs.",
- nodecount-nodecountleft,
- nodecount);
-
- /*
- *--------------------------------------------------------------------------
- * stage 2 allocate based on io workload.
- *--------------------------------------------------------------------------
- */
- while( nodecountleft > 0 &&
- PRESPOOL->OrderedIOBytesWorkload.Root != NULL &&
- impossiblecount < fullcount )
- {
- VSegmentCounterInternal curhost = NULL;
- bool skipcurrent = false;
-
- /* Get and remove the host having largest available resource. */
- leftnode = getLeftMostNode(nodetree);
- removeBBSTNode(nodetree, &leftnode);
- MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
- tmplist= lappend(tmplist, leftnode);
- MEMORY_CONTEXT_SWITCH_BACK
-
- /* If in current loop we should skip chosen machines, check and skip. */
- SegResource currresinfo = (SegResource)(leftnode->Data);
-
- elog(DEBUG5, "Try segment %s to allocate resource by round-robin.",
- GET_SEGRESOURCE_HOSTNAME(currresinfo));
-
- if ( !IS_SEGRESOURCE_USABLE(currresinfo) )
- {
- impossiblecount++;
- skipcurrent = true;
- elog(DEBUG5, "Segment %s is not resource usable, status %d pending %d",
- GET_SEGRESOURCE_HOSTNAME(currresinfo),
- currresinfo->Stat->FTSAvailable,
- currresinfo->RUAlivePending?1:0);
- }
- else
- {
- PAIR pair = getHASHTABLENode(&vsegcnttbl,
- TYPCONVERT(void *,
- currresinfo->Stat->ID));
- if ( pair != NULL )
- {
- Assert(!currresinfo->RUAlivePending);
- Assert(IS_SEGSTAT_FTSAVAILABLE(currresinfo->Stat));
-
- curhost = (VSegmentCounterInternal)(pair->Value);
- /* Host should not break vseg num limit. */
- if ( !fixnodecount && curhost->VSegmentCount >= vseglimitpseg )
- {
- impossiblecount++;
- skipcurrent = true;
- elog(DEBUG5, "Segment %s can not container more vsegs for "
- "current statement, allocated %d vsegs.",
- GET_SEGRESOURCE_HOSTNAME(curhost->Resource),
- curhost->VSegmentCount);
- }
-
- if ( !skipcurrent && skipchosenmachine )
- {
- impossiblecount++;
- skipcurrent = true;
- elog(DEBUG5, "Segment %s is skipped temporarily.",
- GET_SEGRESOURCE_HOSTNAME(curhost->Resource));
- }
- }
- }
-
- if ( !skipcurrent )
- {
- /* Try to allocate resource in the selected host. */
- SegResource curres = (SegResource)(leftnode->Data);
-
- res = getGRMContainerSet(curres, ratio, &containerset);
-
- if ( res != FUNC_RETURN_OK )
- {
- /* This machine does not have the resource with matching ratio.
- * In fact should never occur. */
- impossiblecount++;
- elog(DEBUG5, "Segment %s does not contain resource of %d MBPCORE",
- GET_SEGRESOURCE_HOSTNAME(curres),
- ratio);
- }
- else
- {
-
- if ( curres->SliceWorkload + slicesize > rm_nslice_perseg_limit )
- {
- elog(LOG, "Segment %s contains %d slices working now, "
- "it can not afford %d more slices.",
- GET_SEGRESOURCE_HOSTNAME(curres),
- curres->SliceWorkload,
- slicesize);
- impossiblecount++;
- }
-
- else if ( containerset != NULL &&
- containerset->Available.MemoryMB >= memory &&
- containerset->Available.Core >= core )
- {
- elog(DEBUG3, "Resource manager chooses host %s to allocate vseg.",
- GET_SEGRESOURCE_HOSTNAME(curres));
-
- /* Allocate resource. */
- allocateResourceFromSegment(curres,
- containerset,
- memory,
- core,
- slicesize);
- /* Reorder the changed host. */
- reorderSegResourceAvailIndex(curres, ratio);
-
- /*
- * Check if the selected host has hdfs host name passed in.
- * If true we just simply add the counter, otherwise, we
- * create a new segment counter instance.
- */
- if ( curhost != NULL )
- {
- curhost->VSegmentCount++;
- }
- else
- {
- uint32_t hdfsnameindex = preferredcount;
- int32_t syncid = SEGSTAT_ID_INVALID;
-
- for ( uint32_t k = 0 ; k < preferredcount ; ++k )
- {
- res=getSegIDByHDFSHostName(preferredhostname[k],
- strlen(preferredhostname[k]),
- &syncid);
- if(syncid == curres->Stat->ID)
- {
- hdfsnameindex = k;
- break;
- }
- }
-
- VSegmentCounterInternal vsegcnt =
- createVSegmentCounter(hdfsnameindex, curres);
-
- if (hdfsnameindex == preferredcount)
- {
- if (debug_print_split_alloc_result)
- {
- elog(LOG, "Segment %s mismatched HDFS host name.",
- GET_SEGRESOURCE_HOSTNAME(vsegcnt->Resource));
- }
- }
-
- setHASHTABLENode(&vsegcnttbl,
- TYPCONVERT(void *, curres->Stat->ID),
- TYPCONVERT(void *, vsegcnt),
- false);
- }
- nodecountleft--;
- impossiblecount = 0;
- }
- else
- {
- elog(DEBUG5, "Segment %s does not contain enough resource of "
- "%d MBPCORE",
- GET_SEGRESOURCE_HOSTNAME(curres),
- ratio);
- impossiblecount++;
- }
- }
- }
-
- if ( impossiblecount >= fullcount )
- {
- if ( skipchosenmachine )
- {
- impossiblecount = 0;
- }
- skipchosenmachine = false;
- }
-
- /*
- * If the tree goes to 0 nodes, we have to insert all nodes saved in
- * tmplist back to the tree to make them ordered naturally again.
- */
- if ( nodetree->Root == NULL )
- {
- while( list_length(tmplist) > 0 )
- {
- MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
- insertBBSTNode(nodetree, (BBSTNode)(lfirst(list_head(tmplist))));
- tmplist = list_delete_first(tmplist);
- MEMORY_CONTEXT_SWITCH_BACK
- }
- }
- }
-
- /*
- * Insert all nodes saved in tmplist back to the tree to restore the resource
- * tree for the next time.
- */
- while( list_length(tmplist) > 0 )
- {
- MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
- insertBBSTNode(nodetree, (BBSTNode)(lfirst(list_head(tmplist))));
- tmplist = list_delete_first(tmplist);
- MEMORY_CONTEXT_SWITCH_BACK
- }
-
- /* STEP 3. Refresh io bytes workload. */
- *vsegiobytes = (nodecount - nodecountleft) > 0 ?
- iobytes / (nodecount - nodecountleft) :
- 0;
-
- List *vsegcntlist = NULL;
- ListCell *cell = NULL;
- getAllPAIRRefIntoList(&vsegcnttbl, &vsegcntlist);
- foreach(cell, vsegcntlist)
- {
- VSegmentCounterInternal vsegcounter = (VSegmentCounterInternal)
- ((PAIR)(lfirst(cell)))->Value;
- vsegcounter->Resource->IOBytesWorkload +=
- (*vsegiobytes) * vsegcounter->VSegmentCount;
- reorderSegResourceIOBytesWorkloadIndex(vsegcounter->Resource);
- }
-
- /* STEP 4. Build result. */
- foreach(cell, vsegcntlist)
- {
- MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
- (*vsegcounters) = lappend((*vsegcounters),
- ((PAIR)(lfirst(cell)))->Value);
- MEMORY_CONTEXT_SWITCH_BACK
- }
- freePAIRRefList(&vsegcnttbl, &vsegcntlist);
- cleanHASHTABLE(&vsegcnttbl);
- *totalvsegcount = nodecount - nodecountleft;
-
- validateResourcePoolStatus(false);
- return FUNC_RETURN_OK;
-}
-
int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
int32_t minnodecount,
uint32_t memory,
@@ -2187,7 +1827,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
{
int res = FUNC_RETURN_OK;
uint32_t ratio = memory/core;
- BBST nodetree = &(PRESPOOL->OrderedIOBytesWorkload);
+ BBST nodetree = &(PRESPOOL->OrderedCombinedWorkload);
BBSTNode leftnode = NULL;
SegResource segresource = NULL;
List *tmplist = NULL;
@@ -2318,6 +1958,94 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
}
}
+ /*--------------------------------------------------------------------------
+ * Check if the nvseg variance limit is broken. We check this only when there
+ * are some virtual segments allocated based on passed in data locality
+ * reference, if the limit is broken, the virtual segments already allocated
+ * are returned.
+ *--------------------------------------------------------------------------
+ */
+ if ( nodecountleft != nodecount )
+ {
+ int minnvseg = INT32_MAX;
+ int maxnvseg = 0;
+
+ /* Go through all segments. */
+ List *ressegl = NULL;
+ ListCell *cell = NULL;
+ getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl);
+
+ foreach(cell, ressegl)
+ {
+ PAIR pair = (PAIR)lfirst(cell);
+ SegResource segres = (SegResource)(pair->Value);
+ int nvseg = segres->NVSeg;
+
+ /*
+ * If current nvseg counter list has this host referenced, we should
+ * add the additional 1.
+ */
+ PAIR pair2 = getHASHTABLENode(&vsegcnttbl,
+ TYPCONVERT(void *,
+ segres->Stat->ID));
+ nvseg = pair2 == NULL ? nvseg : nvseg + 1;
+
+ minnvseg = minnvseg < nvseg ? minnvseg : nvseg;
+ maxnvseg = maxnvseg > nvseg ? maxnvseg : nvseg;
+ }
+
+ freePAIRRefList(&(PRESPOOL->Segments), &ressegl);
+ Assert(minnvseg <= maxnvseg);
+
+ if ( maxnvseg - minnvseg > rm_nvseg_variance_among_seg_respool_limit )
+ {
+ elog(LOG, "Reject virtual segment allocation based on data "
+ "locality information. After tentative allocation "
+ "maximum number of virtual segments in one segment is "
+ "%d minimum number of virtual segments in one segment "
+ "is %d, tolerated difference limit is %d.",
+ maxnvseg,
+ minnvseg,
+ rm_nvseg_variance_among_seg_respool_limit);
+
+ /* Return the allocated resource. */
+ List *vsegcntlist = NULL;
+ ListCell *cell = NULL;
+ getAllPAIRRefIntoList(&vsegcnttbl, &vsegcntlist);
+ foreach(cell, vsegcntlist)
+ {
+ VSegmentCounterInternal vsegcounter = (VSegmentCounterInternal)
+ ((PAIR)(lfirst(cell)))->Value;
+ GRMContainerSet ctns = NULL;
+ int res2 = getGRMContainerSet(vsegcounter->Resource, ratio, &ctns);
+ Assert(res2 == FUNC_RETURN_OK);
+
+ res2 = recycleResourceToSegment(vsegcounter->Resource,
+ ctns,
+ memory,
+ core,
+ 0,
+ slicesize,
+ 1);
+ Assert(res2 == FUNC_RETURN_OK);
+
+ /* Free the counter instance. */
+ rm_pfree(PCONTEXT, vsegcounter);
+
+ /* Reorder the changed host. */
+ reorderSegResourceAvailIndex(segresource, ratio);
+ }
+ freePAIRRefList(&vsegcnttbl, &vsegcntlist);
+
+ /* Clear the content in the virtual segment counter hashtable. */
+ clearHASHTABLE(&vsegcnttbl);
+
+ /* Restore nodecount. */
+ nodecountleft = nodecount;
+ }
+ }
+
+
elog(RMLOG, "After choosing vseg based on locality, %d vsegs allocated, "
"expect %d vsegs.",
nodecount-nodecountleft,
@@ -2325,7 +2053,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
/*
*--------------------------------------------------------------------------
- * stage 2 allocate based on io workload.
+ * stage 2 allocate based on combined workload.
*--------------------------------------------------------------------------
*/
@@ -2519,7 +2247,7 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t nodecount,
((PAIR)(lfirst(cell)))->Value;
vsegcounter->Resource->IOBytesWorkload +=
(*vsegiobytes) * vsegcounter->VSegmentCount;
- reorderSegResourceIOBytesWorkloadIndex(vsegcounter->Resource);
+ reorderSegResourceCombinedWorkloadIndex(vsegcounter->Resource);
}
/* STEP 4. Build result. */
@@ -2574,11 +2302,12 @@ int returnResourceToResourcePool(int memory,
memory * vsegcnt->VSegmentCount,
core * vsegcnt->VSegmentCount,
vsegiobytes * vsegcnt->VSegmentCount,
- slicesize * vsegcnt->VSegmentCount);
+ slicesize * vsegcnt->VSegmentCount,
+ vsegcnt->VSegmentCount);
res = reorderSegResourceAvailIndex(segres, ratio);
Assert(res == FUNC_RETURN_OK);
- res = reorderSegResourceIOBytesWorkloadIndex(segres);
+ res = reorderSegResourceCombinedWorkloadIndex(segres);
Assert(res == FUNC_RETURN_OK);
}
else
@@ -2636,6 +2365,7 @@ int allocateResourceFromSegment(SegResource segres,
minusResourceBundleData(&(segres->Available), memory, core);
segres->SliceWorkload += slicesize;
+ segres->NVSeg += 1;
elog(DEBUG3, "HAWQ RM :: allocated resource from machine %s by "
"(%d MB, %lf CORE) for %d slices. "
@@ -2658,10 +2388,12 @@ int recycleResourceToSegment(SegResource segres,
int32_t memory,
double core,
int64_t iobytes,
- int32_t slicesize)
+ int32_t slicesize,
+ int32_t nvseg)
{
segres->IOBytesWorkload -= iobytes;
segres->SliceWorkload -= slicesize;
+ segres->NVSeg -= nvseg;
if ( ctns != NULL )
{
@@ -2753,11 +2485,11 @@ void addSegResourceAllocIndex(SegResource segres)
}
}
-void addSegResourceIOBytesWorkloadIndex(SegResource segres)
+void addSegResourceCombinedWorkloadIndex(SegResource segres)
{
/* Add the node */
- int res = insertBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload),
- createBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload),
+ int res = insertBBSTNode(&(PRESPOOL->OrderedCombinedWorkload),
+ createBBSTNode(&(PRESPOOL->OrderedCombinedWorkload),
segres));
if (res != FUNC_RETURN_OK)
{
@@ -2796,24 +2528,24 @@ int reorderSegResourceAllocIndex(SegResource segres, uint32_t ratio)
return reorderBBSTNodeData(tree, segres);
}
-int reorderSegResourceIOBytesWorkloadIndex(SegResource segres)
+int reorderSegResourceCombinedWorkloadIndex(SegResource segres)
{
int res = FUNC_RETURN_OK;
BBSTNode node = NULL;
/* Reorder the node */
- node = getBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), segres);
+ node = getBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), segres);
if ( node == NULL )
{
return RESOURCEPOOL_INTERNAL_NO_HOST_INDEX;
}
- res = removeBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), &node);
+ res = removeBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), &node);
if ( res != FUNC_RETURN_OK )
{
return RESOURCEPOOL_INTERNAL_NO_HOST_INDEX;
}
- res = insertBBSTNode(&(PRESPOOL->OrderedIOBytesWorkload), node);
+ res = insertBBSTNode(&(PRESPOOL->OrderedCombinedWorkload), node);
if ( res == UTIL_BBST_DUPLICATE_VALUE )
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 918920b..becc9c9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -6339,7 +6339,7 @@ static struct config_int ConfigureNamesInt[] =
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
},
&rm_allocation_policy,
- 1, 0, 10, NULL, NULL
+ 0, 0, 0, NULL, NULL
},
{
@@ -6479,6 +6479,17 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"hawq_rm_nvseg_variance_amon_seg_respool_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the variance of vseg number in each segment that resource manager "
+ "should tolerate at most in resource pool when choosing segments "
+ "based on data locality reference."),
+ NULL
+ },
+ &rm_nvseg_variance_among_seg_respool_limit,
+ 2, 0, 65535, NULL, NULL
+ },
+
+ {
{"hawq_rm_container_batch_limit", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("the batch process limit for global resource manager containers."),
NULL
@@ -6872,6 +6883,51 @@ static struct config_real ConfigureNamesReal[] =
},
{
+ {"hawq_rm_regularize_io_max",PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("Set the maximum io workload limit for regularize the workload of one segment."),
+ NULL
+ },
+ &rm_regularize_io_max,
+ 137438953472.0 /* 128gb */, 0.0, DBL_MAX, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_regularize_nvseg_max",PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("Set the maximum number of virtual segments for regularize the workload of one segment."),
+ NULL
+ },
+ &rm_regularize_nvseg_max,
+ 300.0, 0.0, DBL_MAX, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_regularize_io_factor",PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("Set the factor of io workload in combined workload of a segment."),
+ NULL
+ },
+ &rm_regularize_io_factor,
+ 1.0, 0.0, DBL_MAX, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_regularize_usage_factor",PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("Set the factor of resource usage in combined workload of a segment."),
+ NULL
+ },
+ &rm_regularize_usage_factor,
+ 1.0, 0.0, DBL_MAX, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_regularize_nvseg_factor",PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("Set the factor of number of virtual segments in combined workload of a segment."),
+ NULL
+ },
+ &rm_regularize_nvseg_factor,
+ 1.0, 0.0, DBL_MAX, NULL, NULL
+ },
+
+ {
{"optimizer_nestloop_factor",PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Sets the nestloop join cost factor in the optimizer"),
NULL,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/33c72cd1/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index 8456337..bb59d76 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1217,6 +1217,14 @@ extern char *rm_seg_tmp_dirs;
extern int rm_log_level;
extern int rm_nresqueue_limit;
+extern double rm_regularize_io_max;
+extern double rm_regularize_nvseg_max;
+extern double rm_regularize_io_factor;
+extern double rm_regularize_usage_factor;
+extern double rm_regularize_nvseg_factor;
+
+extern int rm_nvseg_variance_among_seg_respool_limit;
+
extern int max_filecount_notto_split_segment;
extern int min_datasize_to_combine_segment;
extern int datalocality_algorithm_version;