You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2016/01/04 01:32:22 UTC
incubator-hawq git commit: HAWQ-286. Queued resource requests are not
adjusted along with the changing of cluster capacity
Repository: incubator-hawq
Updated Branches:
refs/heads/master d8c1a691f -> af6cfe880
HAWQ-286. Queued resource requests are not adjusted along with the changing of cluster capacity
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/af6cfe88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/af6cfe88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/af6cfe88
Branch: refs/heads/master
Commit: af6cfe8805f865e9437464b4b1934ae02aff0f77
Parents: d8c1a69
Author: YI JIN <yj...@pivotal.io>
Authored: Mon Jan 4 11:32:07 2016 +1100
Committer: YI JIN <yj...@pivotal.io>
Committed: Mon Jan 4 11:32:07 2016 +1100
----------------------------------------------------------------------
.../communication/rmcomm_RM2RMSEG.c | 11 +--
src/backend/resourcemanager/include/dynrm.h | 8 +-
.../resourcemanager/include/resourcepool.h | 2 +-
.../resourcemanager/include/resqueuemanager.h | 9 ++-
src/backend/resourcemanager/requesthandler.c | 22 +++---
.../resourcemanager/requesthandler_ddl.c | 8 +-
.../resourcebroker/resourcebroker_LIBYARN.c | 2 +-
src/backend/resourcemanager/resourcemanager.c | 20 +++--
src/backend/resourcemanager/resourcepool.c | 14 +++-
src/backend/resourcemanager/resqueuedeadlock.c | 5 +-
src/backend/resourcemanager/resqueuemanager.c | 83 +++++++++++++++++---
.../regress/expected/parquet_compression.out | 2 +-
src/test/regress/sql/parquet_compression.sql | 2 +-
13 files changed, 135 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index ecbc2b7..3591ac1 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -235,14 +235,15 @@ void receivedRUAliveResponse(AsyncCommMessageHandlerContext context,
/* Set the host down in gp_segment_configuration table */
if (Gp_role != GP_ROLE_UTILITY)
{
- update_segment_status(segres->Stat->ID + REGISTRATION_ORDER_OFFSET, SEGMENT_STATUS_DOWN);
+ update_segment_status(segres->Stat->ID + REGISTRATION_ORDER_OFFSET,
+ SEGMENT_STATUS_DOWN);
}
/* Set the host down. */
- elog(LOG, "Resource manager sets host %s from up to down "
- "due to not getting valid RUAlive response.",
+ elog(WARNING, "Resource manager sets host %s from up to down "
+ "due to not getting valid RUAlive response.",
GET_SEGRESOURCE_HOSTNAME(segres));
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueueCapacity(false);
}
else {
elog(DEBUG3, "Resource manager find host %s is down already.",
@@ -291,7 +292,7 @@ void sentRUAliveError(AsyncCommMessageHandlerContext context)
"due to communication error.",
GET_SEGRESOURCE_HOSTNAME(segres));
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueueCapacity(false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index 250cb75..92ba8b8 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -293,9 +293,9 @@ int refreshGlobalRMClusterInformation(void);
#define HAWQDRM_CONFFILE_YARN_QUEUE "hawq_rm_yarn_queue_name"
#define HAWQDRM_CONFFILE_YARN_APP_NAME "hawq_rm_yarn_app_name"
-#define HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN "yarn"
-#define HAWQDRM_CONFFILE_SVRTYPE_VAL_MESOS "mesos"
-#define HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE "none"
+#define HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN "yarn"
+#define HAWQDRM_CONFFILE_SVRTYPE_VAL_MESOS "mesos"
+#define HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE "none"
int createDRMMemoryContext(void);
@@ -322,7 +322,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops);
void moveResourceBrokerLogToSysLog(void);
int MainHandlerLoop(void);
-void sendResponseToClients(void);
+void sendResponseToClients(void);
void updateStatusOfAllNodes(void);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index a0f808a..5bc612f 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -518,7 +518,7 @@ void initializeResourcePoolManager(void);
* Add new host into the cluster. New segment will be registered, existing
* segment maybe updated based on latest information passed in.
*/
-int addHAWQSegWithSegStat(SegStat segstat);
+int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged);
/* Update existing host's grm capacity information. */
int updateHAWQSegWithGRMSegStat( SegStat segstat);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index fc22a37..86fb69a 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -344,8 +344,8 @@ typedef struct DynResourceQueueManagerData DynResourceQueueManagerData;
void initializeResourceQueueManager(void);
/* collect resource queues' resource usage status from bottom up. */
void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec);
-/* Refresh resource queue resource capacity based on updated cluster info. */
-void refreshResourceQueuePercentageCapacity(void);
+/* Refresh reosurce queue resource capacity and adjusts all queued requests. */
+void refreshResourceQueueCapacity(bool queuechanged);
/* Dispatch resource to the queuing queries. */
void dispatchResourceToQueries(void);
/* Time out the resource allocated whose QD owner does not have chance to return. */
@@ -512,14 +512,17 @@ void buildQueueTrackShadow(DynResourceQueueTrack toaltertrack);
void cleanupQueueTrackShadows(List **qhavingshadow);
int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
+ bool queuechanged,
char *errorbuf,
int errorbufsize);
int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack quetrack,
+ bool queuechanged,
char *errorbuf,
int errorbufsize);
-int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack);
+int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack,
+ bool queuechanged);
void applyResourceQueueTrackChangesFromShadows(List *quehavingshadow);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index 9702e5b..2da4e4f 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -731,18 +731,22 @@ bool handleRMSEGRequestIMAlive(void **arg)
newsegstat->GRMAvailable = RESOURCE_SEG_STATUS_UNSET;
newsegstat->FTSAvailable = RESOURCE_SEG_STATUS_AVAILABLE;
- if ( addHAWQSegWithSegStat(newsegstat) != FUNC_RETURN_OK )
+ bool capstatchanged = false;
+ if ( addHAWQSegWithSegStat(newsegstat, &capstatchanged) != FUNC_RETURN_OK )
{
- /* Should be a duplciate host. */
+ /* Should be a duplicate host. */
rm_pfree(PCONTEXT, newsegstat);
}
- /* Refresh resource queue capacities. */
- refreshResourceQueuePercentageCapacity();
- /* Recalculate all memory/core ratio instances' limits. */
- refreshMemoryCoreRatioLimits();
- /* Refresh memory/core ratio level water mark. */
- refreshMemoryCoreRatioWaterMark();
+ if ( capstatchanged )
+ {
+ /* Refresh resource queue capacities. */
+ refreshResourceQueueCapacity(false);
+ /* Recalculate all memory/core ratio instances' limits. */
+ refreshMemoryCoreRatioLimits();
+ /* Refresh memory/core ratio level water mark. */
+ refreshMemoryCoreRatioWaterMark();
+ }
/* Send the response. */
RPCResponseIMAliveData response;
@@ -1003,7 +1007,7 @@ bool handleRMRequestSegmentIsDown(void **arg)
hostname = hostname + strlen(hostname) + 1; /* Try next */
}
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueueCapacity(false);
RPCResponseSegmentIsDownData response;
response.Result = res;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/requesthandler_ddl.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_ddl.c b/src/backend/resourcemanager/requesthandler_ddl.c
index 46c05ab..d30765d 100644
--- a/src/backend/resourcemanager/requesthandler_ddl.c
+++ b/src/backend/resourcemanager/requesthandler_ddl.c
@@ -284,6 +284,9 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
goto senderr;
}
+ /* Refresh resource queue capacity. */
+ refreshResourceQueueCapacity(true);
+
break;
case MANIPULATE_RESQUEUE_ALTER:
@@ -390,7 +393,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
* Refresh actual capacity of the resource queue, the change is
* expected to be updated in the shadow instances.
*/
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueuePercentageCapacity(true);
/*------------------------------------------------------------------
* Till now, we expect the input for altering a resource queue is
@@ -410,6 +413,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
res = rebuildAllResourceQueueTrackDynamicStatusInShadow(qhavingshadow,
errorbuf,
+ true,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
@@ -534,8 +538,6 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
Assert(false);
}
- /* Refresh resource queue capacity. */
- refreshResourceQueuePercentageCapacity();
/* Recalculate all memory/core ratio instances' limits. */
refreshMemoryCoreRatioLimits();
/* Refresh memory/core ratio level water mark. */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
index 0e1e414..980202c 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
@@ -692,7 +692,7 @@ int handleRB2RM_ClusterReport(void)
PQUEMGR->GRMQueueMaxCapacity;
PQUEMGR->GRMQueueResourceTight = response.ResourceTight > 0 ? true : false;
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueueCapacity(false);
PRESPOOL->LastUpdateTime = gettime_microsec();
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index d36eb0e..e1b1623 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -2612,6 +2612,7 @@ void updateStatusOfAllNodes() {
SegResource node = NULL;
uint64_t curtime = 0;
+ bool changedstatus = false;
curtime = gettime_microsec();
for(uint32_t idx = 0; idx < PRESPOOL->SegmentIDCounter; idx++) {
node = getSegResource(idx);
@@ -2632,13 +2633,18 @@ void updateStatusOfAllNodes() {
update_segment_status(idx + REGISTRATION_ORDER_OFFSET, SEGMENT_STATUS_DOWN);
}
- elog(LOG, "Resource manager sets host %s from up to down.",
- GET_SEGRESOURCE_HOSTNAME(node));
+ elog(WARNING, "Resource manager sets host %s from up to down.",
+ GET_SEGRESOURCE_HOSTNAME(node));
- refreshResourceQueuePercentageCapacity();
+ changedstatus = true;
}
}
+ if ( changedstatus )
+ {
+ refreshResourceQueueCapacity(false);
+ }
+
validateResourcePoolStatus(true);
}
@@ -2774,8 +2780,10 @@ int loadHostInformationIntoResourcePool(void)
segreport.Buffer);
destroySelfMaintainBuffer(&segreport);
- res = addHAWQSegWithSegStat(segstat);
- if ( res != FUNC_RETURN_OK ) {
+ bool capstatchanged = false;
+ res = addHAWQSegWithSegStat(segstat, &capstatchanged);
+ if ( res != FUNC_RETURN_OK )
+ {
elog(WARNING, "Resource manager failed to add machine from file.");
rm_pfree(PCONTEXT, segstat);
}
@@ -2798,7 +2806,7 @@ int loadHostInformationIntoResourcePool(void)
}
/* Refresh resource queue capacities. */
- refreshResourceQueuePercentageCapacity();
+ refreshResourceQueueCapacity(false);
/* Recalculate all memory/core ratio instances' limits. */
refreshMemoryCoreRatioLimits();
/* Refresh memory/core ratio level water mark. */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 18bba56..cf8272b 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -592,7 +592,7 @@ cleanup:
* one segment in resource pool. If the host exists, update based on the latest
* information.
*/
-int addHAWQSegWithSegStat(SegStat segstat)
+int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
{
Assert(segstat != NULL);
@@ -644,6 +644,7 @@ int addHAWQSegWithSegStat(SegStat segstat)
/* CASE 1. It is a new host. */
if ( res != FUNC_RETURN_OK )
{
+ *capstatchanged = true;
/* Create machine information and corresponding resource information. */
segresource = createSegResource(segstat);
@@ -747,12 +748,16 @@ int addHAWQSegWithSegStat(SegStat segstat)
setSegResHAWQAvailability(segresource, RESOURCE_SEG_STATUS_AVAILABLE);
if (Gp_role != GP_ROLE_UTILITY)
{
- update_segment_status(segresource->Stat->ID + REGISTRATION_ORDER_OFFSET, SEGMENT_STATUS_UP);
+ update_segment_status(segresource->Stat->ID + REGISTRATION_ORDER_OFFSET,
+ SEGMENT_STATUS_UP);
}
elog(LOG, "Resource manager sets segment %s(%d) up from down.",
GET_SEGRESOURCE_HOSTNAME(segresource),
segid);
+
+ /* The segment is up again, its capacity should be considered again. */
+ *capstatchanged = true;
}
/* The machine should be up. Update port number. */
@@ -792,6 +797,8 @@ int addHAWQSegWithSegStat(SegStat segstat)
segcapchanged =
oldftsmem != segresource->Stat->FTSTotalMemoryMB ||
oldftscore != segresource->Stat->FTSTotalCore;
+
+ *capstatchanged = segcapchanged;
}
/* update the status of this node */
@@ -804,7 +811,8 @@ int addHAWQSegWithSegStat(SegStat segstat)
* The expectation is that more than 50% cluster nodes has the same memory/
* core ratio which is selected as the cluster memory/core ratio.
*/
- if ( segcapchanged ) {
+ if ( segcapchanged )
+ {
uint32_t curratio = 0;
if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 )
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/resqueuedeadlock.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuedeadlock.c b/src/backend/resourcemanager/resqueuedeadlock.c
index 69b149a..90404f2 100644
--- a/src/backend/resourcemanager/resqueuedeadlock.c
+++ b/src/backend/resourcemanager/resqueuedeadlock.c
@@ -216,9 +216,8 @@ void copyResourceDeadLockDetectorWithoutLocking(ResqueueDeadLockDetector source,
SessionTrack newstrack = rm_palloc0(PCONTEXT, sizeof(SessionTrackData));
newstrack->SessionID = strack->SessionID;
newstrack->Locked = false;
- resetResourceBundleData(&(newstrack->InUseTotal), 0, 0.0, 0);
- addResourceBundleDataByBundle(&(newstrack->InUseTotal),
- &(strack->InUseTotal));
+ resetResourceBundleDataByBundle(&(newstrack->InUseTotal),
+ &(strack->InUseTotal));
/* Add to the detector. */
SimpArray key;
setSimpleArrayRef(&key, (char *)&(newstrack->SessionID), sizeof(int64_t));
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 7c7fcc2..1ee2770 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -135,8 +135,12 @@ void returnAllocatedResourceToLeafQueue(DynResourceQueueTrack track,
int32_t memorymb,
double core);
+/* Refresh resource queue resource capacity based on updated cluster info. */
+void refreshResourceQueuePercentageCapacity(bool queuechanged);
+
void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
- uint32_t clustercore);
+ uint32_t clustercore,
+ bool queuechanged);
/* Internal APIs for maintaining memory/core ratio trackers. */
int32_t addResourceQueueRatio(DynResourceQueueTrack track);
@@ -2457,8 +2461,32 @@ int returnResourceToResQueMgr(ConnectionTrack conntrack)
return res;
}
+void refreshResourceQueueCapacity(bool queuechanged)
+{
+ static char errorbuf[ERRORMESSAGE_SIZE];
+ List *qhavingshadow = NULL;
+
+ /* STEP 1. Build all necessary shadow resource queue track instances. */
+ buildQueueTrackShadows(PQUEMGR->RootTrack, &qhavingshadow);
+
+ /* STEP 2. Refresh resource queue capacities. */
+ refreshResourceQueuePercentageCapacity(queuechanged);
+
+ /* STEP 3. Rebuild queued resource requests. */
+ rebuildAllResourceQueueTrackDynamicStatusInShadow(qhavingshadow,
+ queuechanged,
+ errorbuf,
+ sizeof(errorbuf));
+
+ /* STEP 4. Apply changes from resource queue shadows. */
+ applyResourceQueueTrackChangesFromShadows(qhavingshadow);
+
+ /* STEP 5. Clean up. */
+ cleanupQueueTrackShadows(&qhavingshadow);
+}
+
/* Refresh actual resource queue capacity. */
-void refreshResourceQueuePercentageCapacity(void)
+void refreshResourceQueuePercentageCapacity(bool queuechanged)
{
/*
* Decide The actual capacity. This is necessary because there maybe some
@@ -2507,7 +2535,7 @@ void refreshResourceQueuePercentageCapacity(void)
elog(DEBUG3, "HAWQ RM :: Use cluster (%d MB, %d CORE) resources as whole.",
mem, core);
- refreshResourceQueuePercentageCapacityInternal(mem, core);
+ refreshResourceQueuePercentageCapacityInternal(mem, core, queuechanged);
/*
* After freshing resource queue capacity, it is necessary to try to dispatch
@@ -3646,16 +3674,29 @@ int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
* Update the overall resource queue percentage capacity.
*/
void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
- uint32_t clustercore)
+ uint32_t clustercore,
+ bool queuechanged)
{
+ static uint32_t prevclustermemmb = 0;
+ static uint32_t prevclustercore = 0;
+
+ if ( (!queuechanged) &&
+ (prevclustermemmb == clustermemmb && prevclustercore == clustercore) )
+ {
+ elog(DEBUG3, "Resource manager skips updating resource queue capacities "
+ "because the total resource quota does not change.");
+ return;
+ }
+
/*
- * STEP 1. Decide the limit ranges of memory and core, decide the memory/core
- * ratio.
+ * STEP 1. Decide the limit ranges of memory and core, decide the
+ * memory/core ratio.
*/
ListCell *cell = NULL;
foreach(cell, PQUEMGR->Queues)
{
- DynResourceQueueTrack track = lfirst(cell);
+ DynResourceQueueTrack track = lfirst(cell);
+ DynResourceQueueTrack origtrack = track;
/* If this resource queue track has a shadow, the shadow is updated. */
track = track->ShadowQueueTrack == NULL ? track : track->ShadowQueueTrack;
@@ -3763,7 +3804,7 @@ void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
if ( !track->trackedMemCoreRatio )
{
track->MemCoreRatio = tmpratio;
- addResourceQueueRatio(track);
+ addResourceQueueRatio(origtrack);
}
}
}
@@ -4085,11 +4126,19 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
int segcounter = 0;
int segmincounter = 0;
+ elog(DEBUG3, "Resource queue %s expects full parallel count %d, "
+ "current running count %d.",
+ track->QueueInfo->Name,
+ track->QueueInfo->ParallelCount,
+ track->NumOfRunningQueries);
+
DQUEUE_LOOP_BEGIN(&(track->QueryResRequests), iter, ConnectionTrack, conntrack)
/* Consider concurrency no more than defined parallel count. */
/* TODO: Consider more here... */
if ( counter + track->NumOfRunningQueries >= track->QueueInfo->ParallelCount )
{
+ elog(RMLOG, "Parallel count limit is encountered, to run %d more",
+ counter);
break;
}
@@ -4100,6 +4149,10 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
/* Check if the minimum segment requirement is met. */
if ( segmincounter + equalsegnummin > availsegnum )
{
+ elog(RMLOG, "Resource allocated is up, available vseg num %d, "
+ "to run %d more",
+ availsegnum,
+ counter);
break;
}
@@ -5067,6 +5120,7 @@ void cleanupQueueTrackShadows(List **qhavingshadow)
}
int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
+ bool queuechanged,
char *errorbuf,
int errorbufsize)
{
@@ -5077,6 +5131,7 @@ int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
{
DynResourceQueueTrack quetrack = (DynResourceQueueTrack)lfirst(cell);
res = rebuildResourceQueueTrackDynamicStatusInShadow(quetrack,
+ queuechanged,
errorbuf,
errorbufsize);
if ( res != FUNC_RETURN_OK )
@@ -5094,7 +5149,7 @@ int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
quetrack->QueueInfo->Name);
}
- res = detectAndDealWithDeadLockInShadow(quetrack);
+ res = detectAndDealWithDeadLockInShadow(quetrack, queuechanged);
if ( res != FUNC_RETURN_OK )
{
elog(WARNING, "Resource manager failed to rebuild resource queue %s "
@@ -5117,6 +5172,7 @@ int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
}
int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack quetrack,
+ bool queuechanged,
char *errorbuf,
int errorbufsize)
{
@@ -5169,7 +5225,7 @@ int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack quetra
}
/* Add request to the resource queue and return. */
- addQueryResourceRequestToQueue(quetrack, newconn);
+ addQueryResourceRequestToQueue(shadowtrack, newconn);
}
else
{
@@ -5184,7 +5240,7 @@ int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack quetra
*/
elog(WARNING, "ConnID %d. %s", newconn->ConnID, errorbuf);
- if ( rm_force_alterqueue_cancel_queued_request )
+ if ( !queuechanged || rm_force_alterqueue_cancel_queued_request )
{
buildAcquireResourceErrorResponse(newconn, res, errorbuf);
transformConnectionTrackProgress(newconn,
@@ -5213,7 +5269,8 @@ int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack quetra
return FUNC_RETURN_OK;
}
-int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack)
+int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack,
+ bool queuechanged)
{
Assert(quetrack != NULL);
Assert(quetrack->ShadowQueueTrack != NULL);
@@ -5254,7 +5311,7 @@ int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack)
if ( expmemorymb > availmemorymb )
{
/* We encounter a deadlock issue. */
- if ( rm_force_alterqueue_cancel_queued_request )
+ if ( !queuechanged || rm_force_alterqueue_cancel_queued_request )
{
cancelQueryRequestToBreakDeadLockInShadow(shadowtrack,
iter,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/test/regress/expected/parquet_compression.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/parquet_compression.out b/src/test/regress/expected/parquet_compression.out
index bcb7cd8..5c01573 100644
--- a/src/test/regress/expected/parquet_compression.out
+++ b/src/test/regress/expected/parquet_compression.out
@@ -145,4 +145,4 @@ Select count(*) from parquet_gzip_2;
1
(1 row)
-alter resource queue pg_default with ( vseg_resource_quota='mem:128mb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:256mb');
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af6cfe88/src/test/regress/sql/parquet_compression.sql
----------------------------------------------------------------------
diff --git a/src/test/regress/sql/parquet_compression.sql b/src/test/regress/sql/parquet_compression.sql
index 92a6f5f..7a61d33 100644
--- a/src/test/regress/sql/parquet_compression.sql
+++ b/src/test/regress/sql/parquet_compression.sql
@@ -122,4 +122,4 @@ insert into parquet_gzip_2 values(12,array_to_string(ARRAY(SELECT chr((65 + roun
Select count(*) from parquet_gzip_2;
-alter resource queue pg_default with ( vseg_resource_quota='mem:128mb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:256mb');