You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/26 12:30:10 UTC
incubator-hawq git commit: HAWQ-47. Make user able to set statement
level resource usage. This closes #34
Repository: incubator-hawq
Updated Branches:
refs/heads/master 133ae317a -> 3a8dcd219
HAWQ-47. Make user able to set statement level resource usage. This closes #34
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3a8dcd21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3a8dcd21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3a8dcd21
Branch: refs/heads/master
Commit: 3a8dcd219f99890da42db8a8d427a5ebd828454a
Parents: 133ae31
Author: Yi Jin <yj...@pivotal.io>
Authored: Mon Oct 26 19:29:02 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Mon Oct 26 19:29:02 2015 +0800
----------------------------------------------------------------------
src/backend/cdb/cdbvars.c | 2 +
.../communication/rmcomm_QD2RM.c | 97 +++--
src/backend/resourcemanager/conntrack.c | 13 +-
.../communication/rmcomm_QD_RM_Protocol.h | 4 +
src/backend/resourcemanager/include/conntrack.h | 3 +
src/backend/resourcemanager/requesthandler.c | 13 +
src/backend/resourcemanager/resqueuecommand.c | 16 +-
src/backend/resourcemanager/resqueuemanager.c | 394 ++++++++++---------
src/backend/utils/misc/guc.c | 55 +++
src/include/cdb/cdbvars.h | 3 +
10 files changed, 378 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 921e87f..6b468e9 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -321,6 +321,8 @@ char *rm_grm_yarn_queue;
char *rm_grm_yarn_app_name;
int rm_grm_breath_return_percentage;
+char *rm_stmt_vseg_mem_str;
+int rm_stmt_nvseg;
int rm_seg_container_default_waterlevel;
bool rm_force_fifo_queue;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 10189e0..09d6da9 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -32,7 +32,7 @@
QD2RM_ResourceSets[(index)] == NULL ) \
{ \
snprintf((errorbuf), (errorbufsize), \
- "Wrong resource set index %d", (index)); \
+ "wrong resource set index %d", (index)); \
return COMM2RM_CLIENT_WRONG_INPUT; \
}
@@ -374,20 +374,24 @@ int cleanupQD2RMComm(void)
{
if ( QD2RM_ResourceSets[i]->QD_ResourceList != NULL )
{
- elog( LOG, "Un-returned resource is probed, will be returned. "
- "(%d MB, %lf CORE) x %d. Conn ID=%d",
- QD2RM_ResourceSets[i]->QD_SegMemoryMB,
- QD2RM_ResourceSets[i]->QD_SegCore,
- QD2RM_ResourceSets[i]->QD_SegCount,
- QD2RM_ResourceSets[i]->QD_Conn_ID);
+ elog(LOG, "Un-returned resource is probed, will be returned. "
+ "(%d MB, %lf CORE) x %d. Conn ID=%d",
+ QD2RM_ResourceSets[i]->QD_SegMemoryMB,
+ QD2RM_ResourceSets[i]->QD_SegCore,
+ QD2RM_ResourceSets[i]->QD_SegCount,
+ QD2RM_ResourceSets[i]->QD_Conn_ID);
res = returnResource(i, errorbuf, sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK ) {
- elog(LOG, "Fail to return resource when cleaning up resource context.");
+ if ( res != FUNC_RETURN_OK )
+ {
+ elog(WARNING, "Failed to return resource when cleaning up "
+ "resource context.");
}
res = unregisterConnectionInRM(i, errorbuf, sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK ) {
- elog(LOG, "Fail to unregister when cleaning up resource context.");
+ if ( res != FUNC_RETURN_OK )
+ {
+ elog(WARNING, "Failed to unregister when cleaning up "
+ "resource context.");
}
}
}
@@ -422,7 +426,7 @@ int registerConnectionInRMByStr(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to register in HAWQ resource manager because of "
+ "failed to register in HAWQ resource manager because of "
"RPC error %s.",
getErrorCodeExplain(res));
return res;
@@ -436,7 +440,7 @@ int registerConnectionInRMByStr(int index,
if ( response->Result != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to register in HAWQ resource manager because of remote"
+ "failed to register in HAWQ resource manager because of remote"
"error %s.",
getErrorCodeExplain(response->Result));
return response->Result;
@@ -472,7 +476,7 @@ int registerConnectionInRMByOID(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to register in HAWQ resource manager because of "
+ "failed to register in HAWQ resource manager because of "
"RPC error %s.",
getErrorCodeExplain(res));
return res;
@@ -486,7 +490,7 @@ int registerConnectionInRMByOID(int index,
if ( response->Result != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to register in HAWQ resource manager because of remote"
+ "failed to register in HAWQ resource manager because of remote"
"error %s.",
getErrorCodeExplain(response->Result));
return response->Result;
@@ -522,7 +526,7 @@ int unregisterConnectionInRM(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to unregister in HAWQ resource manager because of "
+ "failed to unregister in HAWQ resource manager because of "
"RPC error %s.",
getErrorCodeExplain(res));
return res;
@@ -535,7 +539,7 @@ int unregisterConnectionInRM(int index,
{
res = response->Result;
snprintf(errorbuf, errorbufsize,
- "Fail to unregister in HAWQ resource manager because of "
+ "failed to unregister in HAWQ resource manager because of "
"remote error %s.",
getErrorCodeExplain(response->Result));
}
@@ -606,6 +610,15 @@ int acquireResourceFromRM(int index,
requesthead.VSegLimit = rm_nvseg_perquery_limit;
requesthead.Reserved = 0;
requesthead.IOBytes = iobytes;
+ requesthead.StatNVSeg = rm_stmt_nvseg;
+
+ requesthead.StatVSegMemoryMB = 0;
+ int parseres = FUNC_RETURN_OK;
+ SimpString valuestr;
+ setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str));
+ parseres = SimpleStringToStorageSizeMB(&valuestr,
+ &(requesthead.StatVSegMemoryMB));
+ Assert(parseres == FUNC_RETURN_OK);
appendSMBVar(sendbuffer,requesthead);
@@ -636,8 +649,8 @@ int acquireResourceFromRM(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to acquire resource from HAWQ resource manager because of "
- "RPC error %s.",
+ "failed to acquire resource from HAWQ resource manager because "
+ "of RPC error %s.",
getErrorCodeExplain(res));
pgstat_report_waiting_resource(false);
return res;
@@ -649,7 +662,7 @@ int acquireResourceFromRM(int index,
if ( errres->Result != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to acquire resource because of remote error %s.",
+ "failed to acquire resource because of remote error %s.",
getErrorCodeExplain(errres->Result));
return errres->Result;
}
@@ -776,7 +789,7 @@ int returnResource(int index,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to return resource to HAWQ resource manager because of "
+ "failed to return resource to HAWQ resource manager because of "
"RPC error %s.",
getErrorCodeExplain(res));
return res;
@@ -865,7 +878,7 @@ int manipulateResourceQueue(int index,
RPCResponseHeadManipulateResQueueERROR error =
(RPCResponseHeadManipulateResQueueERROR)(recvbuffer->Buffer);
- elog(WARNING, "Fail to manipulate resource queue because %s",
+ elog(LOG, "Fail to manipulate resource queue because %s",
error->ErrorText);
snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
}
@@ -1147,11 +1160,21 @@ int acquireResourceQuotaFromRM(int64_t user_oid,
initializeSelfMaintainBuffer(&recvBuffer, QD2RM_CommContext);
RPCRequestHeadAcquireResourceQuotaFromRMByOIDData request;
- request.UseridOid = user_oid;
- request.MaxSegCountFix = max_seg_count_fix;
- request.MinSegCountFix = min_seg_count_fix;
- request.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
- request.VSegLimit = rm_nvseg_perquery_limit;
+ request.UseridOid = user_oid;
+ request.MaxSegCountFix = max_seg_count_fix;
+ request.MinSegCountFix = min_seg_count_fix;
+ request.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
+ request.VSegLimit = rm_nvseg_perquery_limit;
+ request.StatNVSeg = rm_stmt_nvseg;
+
+ request.StatVSegMemoryMB = 0;
+ int parseres = FUNC_RETURN_OK;
+ SimpString valuestr;
+ setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str));
+ parseres = SimpleStringToStorageSizeMB(&valuestr,
+ &(request.StatVSegMemoryMB));
+ Assert(parseres == FUNC_RETURN_OK);
+
appendSMBVar(&sendBuffer, request);
elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, "
@@ -1168,24 +1191,27 @@ int acquireResourceQuotaFromRM(int64_t user_oid,
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Fail to get response from resource manager RPC.");
+ "failed to get response from resource manager RPC.");
*errorcode = res;
goto exit;
}
RPCResponseHeadAcquireResourceQuotaFromRMByOID response =
(RPCResponseHeadAcquireResourceQuotaFromRMByOID)(recvBuffer.Buffer);
- if ( response->Result == FUNC_RETURN_OK ) {
+ if ( response->Result == FUNC_RETURN_OK )
+ {
*seg_num = response->SegNum;
*seg_num_min = response->SegNumMin;
*seg_memory_mb = response->SegMemoryMB;
*seg_core = response->SegCore;
}
- else {
+ else
+ {
res = response->Result;
*errorcode = res;
- snprintf(errorbuf, errorbufsize, "Fail to get resource quota due to "
- "remote error %d.", res);
+ snprintf(errorbuf, errorbufsize,
+ "failed to get resource quota due to remote error %s.",
+ getErrorCodeExplain(res));
}
exit:
@@ -1610,15 +1636,16 @@ extern Datum pg_explain_resource_distribution(PG_FUNCTION_ARGS)
ret = returnResource(resourceId, errorbuf, sizeof(errorbuf));
if ( ret != FUNC_RETURN_OK )
{
- elog(ERROR, "Fail to return resource back to resource manager "
- "because %s", errorbuf);
+ elog(ERROR, "failed to return resource back to resource manager "
+ "because %s",
+ errorbuf);
}
/* STEP 5. Unregister. */
ret = unregisterConnectionInRM(resourceId, errorbuf, sizeof(errorbuf));
if ( ret != FUNC_RETURN_OK )
{
- elog(ERROR, "Fail to unregister connection in RM because %s",
+ elog(ERROR, "failed to unregister connection in RM because %s",
errorbuf);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/conntrack.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/conntrack.c b/src/backend/resourcemanager/conntrack.c
index 12d59d7..62dea4c 100644
--- a/src/backend/resourcemanager/conntrack.c
+++ b/src/backend/resourcemanager/conntrack.c
@@ -109,6 +109,9 @@ void createEmptyConnectionTrack(ConnectionTrack *track)
(*track)->MinSegCountFixed = 0;
(*track)->VSegLimitPerSeg = -1;
(*track)->VSegLimit = -1;
+ (*track)->StatVSegMemoryMB = 0;
+ (*track)->StatNVSeg = 0;
+ (*track)->SegNumEqual = 0;
(*track)->SliceSize = 0;
(*track)->IOBytes = 0;
(*track)->QueueID = 0;
@@ -259,6 +262,9 @@ int retrieveConnectionTrack(ConnectionTrack track, int32_t connid)
track->MinSegCountFixed = oldct->MinSegCountFixed;
track->VSegLimitPerSeg = oldct->VSegLimitPerSeg;
track->VSegLimit = oldct->VSegLimit;
+ track->StatNVSeg = oldct->StatNVSeg;
+ track->StatVSegMemoryMB = oldct->StatVSegMemoryMB;
+ track->SegNumEqual = oldct->SegNumEqual;
track->SliceSize = oldct->SliceSize;
track->SegIOBytes = oldct->SegIOBytes;
track->IOBytes = oldct->IOBytes;
@@ -595,7 +601,8 @@ void dumpConnectionTracks(const char *filename)
"vseg limit per query=%d:"
"fixsegsize=%d:"
"reqtime=%s:"
- "alloctime=%s),",
+ "alloctime=%s:"
+ "stmt=%d MB x %d),",
conn->SessionID,
conn->SegMemoryMB, conn->SegCore,
conn->SegNum, conn->SegNumMin, conn->SegNumActual,
@@ -605,7 +612,9 @@ void dumpConnectionTracks(const char *filename)
conn->VSegLimit,
conn->MinSegCountFixed,
format_time_microsec(conn->ResRequestTime),
- format_time_microsec(conn->ResAllocTime));
+ format_time_microsec(conn->ResAllocTime),
+ conn->StatVSegMemoryMB,
+ conn->StatNVSeg);
fprintf(fp, "LOC(size=%d", conn->SegPreferredHostCount);
if ( conn->SegPreferredHostCount <= 0 )
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
index 17d397b..a275d0e 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
@@ -196,6 +196,8 @@ RPC_PROTOCOL_STRUCT_BEGIN(RPCRequestHeadAcquireResourceFromRM)
int32_t SliceSize;
uint32_t VSegLimitPerSeg;
uint32_t VSegLimit;
+ uint32_t StatVSegMemoryMB;
+ uint32_t StatNVSeg;
uint32_t Reserved;
int64_t IOBytes;
RPC_PROTOCOL_STRUCT_END(RPCRequestHeadAcquireResourceFromRM)
@@ -246,6 +248,8 @@ RPC_PROTOCOL_STRUCT_BEGIN(RPCRequestHeadAcquireResourceQuotaFromRMByOID)
uint32_t MinSegCountFix;
uint32_t VSegLimitPerSeg;
uint32_t VSegLimit;
+ uint32_t StatVSegMemoryMB;
+ uint32_t StatNVSeg;
RPC_PROTOCOL_STRUCT_END(RPCRequestHeadAcquireResourceQuotaFromRMByOID)
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/include/conntrack.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/conntrack.h b/src/backend/resourcemanager/include/conntrack.h
index f809102..76e8c7c 100644
--- a/src/backend/resourcemanager/include/conntrack.h
+++ b/src/backend/resourcemanager/include/conntrack.h
@@ -95,6 +95,7 @@ struct ConnectionTrackData
int32_t SegNum;
int32_t SegNumMin;
int32_t SegNumActual;
+ int32_t SegNumEqual;
int32_t SegPreferredHostCount;
char **SegPreferredHostNames;
int64_t *SegPreferredScanSizeMB;
@@ -104,6 +105,8 @@ struct ConnectionTrackData
int32_t MinSegCountFixed;
int32_t VSegLimitPerSeg;
int32_t VSegLimit;
+ uint32_t StatVSegMemoryMB;
+ uint32_t StatNVSeg;
List *Resource; /* Allocated resource. */
void *QueueTrack;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index de8131f..55a3dcd 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -368,6 +368,8 @@ bool handleRMRequestAcquireResource(void **arg)
(*conntrack)->SessionID = request->SessionID;
(*conntrack)->VSegLimitPerSeg = request->VSegLimitPerSeg;
(*conntrack)->VSegLimit = request->VSegLimit;
+ (*conntrack)->StatVSegMemoryMB = request->StatVSegMemoryMB;
+ (*conntrack)->StatNVSeg = request->StatNVSeg;
/* Get preferred nodes. */
buildSegPreferredHostInfo((*conntrack));
@@ -381,6 +383,17 @@ bool handleRMRequestAcquireResource(void **arg)
(*conntrack)->SegPreferredHostCount,
(*conntrack)->VSegLimitPerSeg,
(*conntrack)->VSegLimit);
+
+ if ( (*conntrack)->StatNVSeg > 0 )
+ {
+ elog(LOG, "Statement level resource quota is active. "
+ "ConnID=%d, Expect resource. "
+ "Total %d vsegs, each vseg has %d MB memory quota.",
+ (*conntrack)->ConnID,
+ (*conntrack)->StatNVSeg,
+ (*conntrack)->StatVSegMemoryMB);
+ }
+
res = acquireResourceFromResQueMgr((*conntrack));
if ( res != FUNC_RETURN_OK )
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/resqueuecommand.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuecommand.c b/src/backend/resourcemanager/resqueuecommand.c
index 0247fef..19b255a 100644
--- a/src/backend/resourcemanager/resqueuecommand.c
+++ b/src/backend/resourcemanager/resqueuecommand.c
@@ -91,8 +91,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply CREATE RESOURCE QUEUE. "
- "Because too many resource contexts were created.")));
+ errmsg("can not apply CREATE RESOURCE QUEUE, "
+ "because too many resource contexts were created.")));
}
/* Here, using user oid is more convenient. */
@@ -125,7 +125,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
+ errmsg("can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
}
@@ -222,8 +222,8 @@ void dropResourceQueue(DropQueueStmt *stmt)
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply DROP RESOURCE QUEUE. "
- "Because too many resource contexts were created.")));
+ errmsg("cannot apply DROP RESOURCE QUEUE, "
+ "because too many resource contexts were created.")));
}
/* Here, using user oid is more convenient. */
@@ -255,7 +255,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
+ errmsg("can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
@@ -334,7 +334,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
Assert(res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Too many existing resource context.")));
+ errmsg("too many existing resource context.")));
}
/* Here, using user oid is more convenient. */
@@ -365,7 +365,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply ALTER RESOURCE QUEUE because %s", errorbuf)));
+ errmsg("cannot apply ALTER RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index ef055c3..862eed9 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -97,14 +97,7 @@ computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
computeQueryQuota_EVEN
};
-int computeQueryQuota( DynResourceQueueTrack track,
- int32_t *max_segcountfix,
- int32_t *min_segcountfix,
- int32_t *segmemmb,
- double *segcore,
- int32_t *segnum,
- int32_t *segnummin,
- int32_t segnumlimit);
+int computeQueryQuota( ConnectionTrack conn);
/*------------------------------------------
* The resource distribution functions.
@@ -2116,78 +2109,74 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
}
/* Call quota logic to make decision of resource for current query. */
- res = computeQueryQuota(queuetrack,
- &conntrack->MaxSegCountFixed,
- &conntrack->MinSegCountFixed,
- &(conntrack->SegMemoryMB),
- &(conntrack->SegCore),
- &(conntrack->SegNum),
- &(conntrack->SegNumMin),
- conntrack->VSegLimit);
+ res = computeQueryQuota(conntrack);
if ( res == FUNC_RETURN_OK )
{
-
- int32_t Rmax = conntrack->SegNum;
- int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
- int32_t Rmin = conntrack->SegNumMin;
- elog(LOG, "HAWQ RM :: original quota min seg num:%d, max seg num:%d",
- conntrack->SegNumMin,
- conntrack->SegNum);
-
- /* Ensure quota [min,max] is between request [min,max] */
- int32_t Gmax= conntrack->MaxSegCountFixed;
- int32_t Gmin= conntrack->MinSegCountFixed;
-
- if(Gmin==1)
+ if ( conntrack->StatNVSeg == 0 )
{
- /* case 1 */
- conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
- conntrack->SegNum = min(Gmax,RmaxL);
- if(conntrack->SegNumMin > conntrack->SegNum)
+ int32_t Rmax = conntrack->SegNum;
+ int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
+ int32_t Rmin = conntrack->SegNumMin;
+ elog(LOG, "Original quota min seg num:%d, max seg num:%d",
+ conntrack->SegNumMin,
+ conntrack->SegNum);
+
+ /* Ensure quota [min,max] is between request [min,max] */
+ int32_t Gmax= conntrack->MaxSegCountFixed;
+ int32_t Gmin= conntrack->MinSegCountFixed;
+
+ if(Gmin==1)
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 1 */
+ conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
+ conntrack->SegNum = min(Gmax,RmaxL);
+ if(conntrack->SegNumMin > conntrack->SegNum)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- else if(Gmax == Gmin)
- {
- /* case 2 */
- conntrack->SegNumMin = Gmax;
- conntrack->SegNum = Gmax;
- if(Rmax < Gmax)
+ else if(Gmax == Gmin)
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 2 */
+ conntrack->SegNumMin = Gmax;
+ conntrack->SegNum = Gmax;
+ if(Rmax < Gmax)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- else
- {
- /* case 3 */
- conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
- conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
- if(conntrack->SegNumMin > conntrack->SegNum)
+ else
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 3 */
+ conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
+ conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
+ if(conntrack->SegNumMin > conntrack->SegNum)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
- conntrack->SegMemoryMB,
- conntrack->SegCore,
- conntrack->SegNum,
- conntrack->SegNumMin);
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
- adjustResourceExpectsByQueueNVSegLimits(conntrack);
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
- elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
- "resource after adjusting based on queue NVSEG limits.",
- conntrack->SegMemoryMB,
- conntrack->SegCore,
- conntrack->SegNum,
- conntrack->SegNumMin);
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
+ }
/* Add request to the resource queue and return. */
res = addQueryResourceRequestToQueue(queuetrack, conntrack);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
transformConnectionTrackProgress(conntrack,
CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
return res;
@@ -2230,75 +2219,73 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
goto exit;
}
+ conntrack->QueueTrack = queuetrack;
+ conntrack->QueueID = queuetrack->QueueInfo->OID;
+
/* Compute query quota */
- res = computeQueryQuota(queuetrack,
- &conntrack->MaxSegCountFixed,
- &conntrack->MinSegCountFixed,
- &(conntrack->SegMemoryMB),
- &(conntrack->SegCore),
- &(conntrack->SegNum),
- &(conntrack->SegNumMin),
- conntrack->VSegLimit);
+ res = computeQueryQuota(conntrack);
if ( res == FUNC_RETURN_OK )
{
-
- int32_t Rmax = conntrack->SegNum;
- int32_t RmaxL =conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
- int32_t Rmin = conntrack->SegNumMin;
- elog(LOG, "HAWQ RM :: original quota min seg num:%d, max seg num:%d",
- conntrack->SegNumMin,
- conntrack->SegNum);
-
- /* Ensure quota [min,max] is between request [min,max] */
- int32_t Gmax= conntrack->MaxSegCountFixed;
- int32_t Gmin= conntrack->MinSegCountFixed;
-
- if(Gmin==1)
+ if ( conntrack->StatNVSeg == 0 )
{
- /* case 1 */
- conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
- conntrack->SegNum = min(Gmax,RmaxL);
- if(conntrack->SegNumMin > conntrack->SegNum)
+ int32_t Rmax = conntrack->SegNum;
+ int32_t RmaxL =conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
+ int32_t Rmin = conntrack->SegNumMin;
+ elog(LOG, "Original quota min seg num:%d, max seg num:%d",
+ conntrack->SegNumMin,
+ conntrack->SegNum);
+
+ /* Ensure quota [min,max] is between request [min,max] */
+ int32_t Gmax= conntrack->MaxSegCountFixed;
+ int32_t Gmin= conntrack->MinSegCountFixed;
+
+ if(Gmin==1)
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 1 */
+ conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
+ conntrack->SegNum = min(Gmax,RmaxL);
+ if(conntrack->SegNumMin > conntrack->SegNum)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- else if(Gmax == Gmin)
- {
- /* case 2 */
- conntrack->SegNumMin = Gmax;
- conntrack->SegNum = Gmax;
- if(Rmax < Gmax)
+ else if(Gmax == Gmin)
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 2 */
+ conntrack->SegNumMin = Gmax;
+ conntrack->SegNum = Gmax;
+ if(Rmax < Gmax)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- else
- {
- /* case 3 */
- conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
- conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
- if(conntrack->SegNumMin > conntrack->SegNum)
+ else
{
- return RESQUEMGR_NO_RESOURCE;
+ /* case 3 */
+ conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
+ conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
+ if(conntrack->SegNumMin > conntrack->SegNum)
+ {
+ return RESQUEMGR_NO_RESOURCE;
+ }
}
- }
- elog(LOG, "Expect (%d MB, %lf CORE) x %d ( min %d ) resource quota.",
- conntrack->SegMemoryMB,
- conntrack->SegCore,
- conntrack->SegNum,
- conntrack->SegNumMin);
+ elog(LOG, "Expect (%d MB, %lf CORE) x %d ( min %d ) resource quota.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
- adjustResourceExpectsByQueueNVSegLimits(conntrack);
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
- elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
- "resource after adjusting based on queue NVSEG limits.",
- conntrack->SegMemoryMB,
- conntrack->SegCore,
- conntrack->SegNum,
- conntrack->SegNumMin);
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
+ }
}
else
{
@@ -2498,17 +2485,6 @@ void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec)
mctrack->TotalRequest.MemoryMB = mctrack->ClusterMemoryMaxMB;
}
- /*
- elog(DEBUG5, "HAWQ RM :: Memory/ratio[%d] %d MBPCORE has "
- "(%d MB, %lf CORE) in use, (%d MB, %lf CORE) requested.",
- i,
- mctrack->MemCoreRatio,
- mctrack->TotalUsed.MemoryMB,
- mctrack->TotalUsed.Core,
- mctrack->TotalRequest.MemoryMB,
- mctrack->TotalRequest.Core);
- */
-
markMemoryCoreRatioWaterMark(&(PQUEMGR->RatioWaterMarks[i]),
curmicrosec,
mctrack->TotalUsed.MemoryMB,
@@ -3301,66 +3277,107 @@ void minusResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle sourc
/**
* Compute the query quota.
*/
-int computeQueryQuota( DynResourceQueueTrack track,
- int32_t *max_segcountfix,
- int32_t *min_segcountfix,
- int32_t *segmemmb,
- double *segcore,
- int32_t *segnum,
- int32_t *segnummin,
- int32_t segnumlimit)
+int computeQueryQuota( ConnectionTrack conn)
{
- int res = FUNC_RETURN_OK;
- int policy = 0;
+ Assert( conn != NULL );
+ Assert( conn->QueueTrack != NULL );
- Assert( track != NULL );
+ int res = FUNC_RETURN_OK;
+ int policy = 0;
+ DynResourceQueueTrack track = (DynResourceQueueTrack)(conn->QueueTrack);
policy = track->QueueInfo->AllocatePolicy;
Assert( policy >= 0 && policy < RSQ_ALLOCATION_POLICY_COUNT );
- /* Get one segment resource quota. */
- *segmemmb = track->QueueInfo->SegResourceQuotaMemoryMB;
- *segcore = track->QueueInfo->SegResourceQuotaVCore;
-
- /* Decide segment number and minimum runnable segment number. */
-
- if (*min_segcountfix > segnumlimit)
+ /*
+ *--------------------------------------------------------------------------
+ * Get one segment resource quota. If statement level resource quota is not
+ * specified, the queue vseg resource quota is derived, otherwise, statement
+ * level resource quota. The resource memory/core ratio is not changed, thus
+ * code has to calculate the adjusted vcore quota for each vseg in case
+ * statement level resource quota is active.
+ *--------------------------------------------------------------------------
+ */
+ if ( conn->StatNVSeg > 0 )
{
- res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
- elog(LOG, " Expect too many virtual segments %d, can not be more "
- "than %d",
- *min_segcountfix,
- segnumlimit);
- return res;
+ conn->SegMemoryMB = conn->StatVSegMemoryMB;
+ conn->SegCore = track->QueueInfo->SegResourceQuotaVCore *
+ conn->StatVSegMemoryMB /
+ track->QueueInfo->SegResourceQuotaMemoryMB;
+ conn->SegNum = conn->StatNVSeg;
+ conn->SegNumMin = conn->StatNVSeg;
+
+ /* Check if the resource capacity is more than the capacity of queue. */
+ conn->SegNumEqual = ceil(1.0 * conn->SegMemoryMB * conn->SegNumMin /
+ track->QueueInfo->SegResourceQuotaMemoryMB);
+ Assert( conn->SegNumEqual > 0 );
+ if ( conn->SegNumEqual > track->ClusterSegNumberMax )
+ {
+ res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
+ elog(WARNING, "ConnID %d expects too many virtual segments %d that is"
+ "set by hawq_rm_stmt_nvseg.",
+ conn->ConnID,
+ conn->SegNum);
+ return res;
+ }
}
- if(*max_segcountfix > segnumlimit)
+ else
{
- *max_segcountfix = segnumlimit;
+ conn->SegMemoryMB = track->QueueInfo->SegResourceQuotaMemoryMB;
+ conn->SegCore = track->QueueInfo->SegResourceQuotaVCore;
}
- /* Compute total resource quota. */
- res = AllocationPolicy[policy] (track, segnum, segnummin, segnumlimit);
-
- if ( *segnum < *min_segcountfix )
+ /* Decide vseg number and minimum runnable vseg number. */
+ if ( conn->SegNumMin > conn->VSegLimit )
{
res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
- elog(LOG, " Expect too many virtual segments %d, can not be more "
- "than %d",
- *min_segcountfix,
- *segnum);
+ elog(WARNING, "ConnID %d expects too many virtual segments %d, "
+ "cannot be more than %d",
+ conn->ConnID,
+ conn->SegNumMin,
+ conn->VSegLimit);
return res;
+ }
+ if ( conn->SegNum > conn->VSegLimit )
+ {
+ conn->SegNum = conn->VSegLimit;
}
- /* Always respect the expected minimum vseg num. */
- *segnummin = *min_segcountfix;
+ if ( conn->StatNVSeg <= 0 )
+ {
+ /* Compute total resource quota. */
+ res = AllocationPolicy[policy] (track,
+ &(conn->SegNum),
+ &(conn->SegNumMin),
+ conn->VSegLimit);
+
+ /*
+ * If fixed vseg count range is lower than estimated vseg count range
+ * based on one allocation policy, we always respect the fixed range.
+ */
+ if ( conn->SegNum < conn->MinSegCountFixed )
+ {
+ res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
+ elog(WARNING, "Expect too many virtual segments %d, cannot be more "
+ "than %d",
+ conn->MinSegCountFixed,
+ conn->SegNum);
+ return res;
+ }
+
+ conn->SegNumMin = conn->MinSegCountFixed;
+ conn->SegNum = conn->SegNum < conn->MaxSegCountFixed ?
+ conn->SegNum :
+ conn->MaxSegCountFixed;
+ }
elog(DEBUG3, "Expect cluster resource (%d MB, %lf CORE) x %d "
"minimum runnable %d segment(s).",
- *segmemmb,
- *segcore,
- *segnum,
- *segnummin);
+ conn->SegMemoryMB,
+ conn->SegCore,
+ conn->SegNum,
+ conn->SegNumMin);
return FUNC_RETURN_OK;
}
@@ -3417,7 +3434,7 @@ int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
{
insertDQueueTailNode(&(queuetrack->QueryResRequests), conntrack);
- /* add resource request counter. */
+ /* Add resource request counter. */
addResourceBundleData(&(queuetrack->TotalRequest),
conntrack->SegMemoryMB * conntrack->SegNum,
conntrack->SegCore * conntrack->SegNum);
@@ -3854,14 +3871,27 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
/* Consider concurrency no more than defined parallel count. */
/* TODO: Consider more here... */
if ( counter + track->NumOfRunningQueries >= track->QueueInfo->ParallelCount )
+ {
break;
+ }
+
+ int equalsegnummin = conntrack->StatNVSeg <= 0 ?
+ conntrack->SegNumMin :
+ conntrack->SegNumEqual;
+
/* Check if the minimum segment requirement is met. */
- if ( segmincounter + conntrack->SegNumMin > availsegnum )
+ if ( segmincounter + equalsegnummin > availsegnum )
{
break;
}
- segcounter += conntrack->SegNum;
- segmincounter += conntrack->SegNumMin;
+
+ segcounter += conntrack->StatNVSeg <= 0 ?
+ conntrack->SegNum :
+ conntrack->SegNumEqual;
+
+ segmincounter += conntrack->StatNVSeg <= 0 ?
+ conntrack->SegNumMin :
+ conntrack->SegNumEqual;
counter++;
DQUEUE_LOOP_END
@@ -3880,7 +3910,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
ConnectionTrack conn = removeDQueueHeadNode(&(track->QueryResRequests));
conn->SegNumActual = conn->SegNumMin;
insertDQueueTailNode(&todisp, conn);
- availsegnum -= conn->SegNumMin;
+ availsegnum -= conn->StatNVSeg <= 0 ? conn->SegNumMin : conn->SegNumEqual;
}
DQueueNode pnode = getDQueueContainerHead(&todisp);
@@ -3888,7 +3918,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
while(availsegnum > 0)
{
ConnectionTrack conn = (ConnectionTrack)(pnode->Data);
- if ( conn->SegNum > conn->SegNumActual )
+ if ( conn->StatNVSeg == 0 && conn->SegNum > conn->SegNumActual )
{
conn->SegNumActual++;
availsegnum--;
@@ -3912,7 +3942,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
elog(DEBUG3, "Resource manager tries to dispatch resource to connection %d. "
"Expect (%d MB, %lf CORE) x %d(max %d min %d) segment(s). "
"Original vseg %d(min %d). "
- "VSeg limit per segment %d VSeg limit per query %d",
+ "VSeg limit per segment %d VSeg limit per query %d.",
conn->ConnID,
conn->SegMemoryMB,
conn->SegCore,
@@ -3924,6 +3954,16 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
conn->VSegLimitPerSeg,
conn->VSegLimit);
+ if ( conn->StatNVSeg > 0 )
+ {
+ elog(LOG, "Resource manager tries to dispatch resource to connection %d. "
+ "Statement level resource quota is active. "
+ "Total %d vsegs, each vseg has %d MB memory quota.",
+ conn->ConnID,
+ conn->StatNVSeg,
+ conn->StatVSegMemoryMB);
+ }
+
/* Build resource. */
int32_t segnumact = 0;
allocateResourceFromResourcePool(conn->SegNumActual,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9c8b6ef..30f8ee0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -85,6 +85,8 @@
#include "cdb/cdbquerycontextdispatching.h"
#include "cdb/memquota.h"
#include "utils/vmem_tracker.h"
+#include "resourcemanager/errorcode.h"
+#include "resourcemanager/utils/simplestring.h"
#ifndef PG_KRB_SRVTAB
#define PG_KRB_SRVTAB ""
@@ -250,6 +252,8 @@ static const char *assign_password_hash_algorithm(const char *newval,
bool doit, GucSource source);
static bool assign_gp_temporary_directory_mark_error(int newval, bool doit, GucSource source);
+static const char * assign_hawq_rm_stmt_vseg_memory(const char *newval, bool doit, GucSource source);
+
/*
* GUC option variables that are exported from this module
*/
@@ -6422,6 +6426,15 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"hawq_rm_stmt_nvseg", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the size quota of virtual segments for one statement."),
+ NULL
+ },
+ &rm_stmt_nvseg,
+ 0, 0, 65535, NULL, NULL
+ },
+
+ {
{"hawq_rm_nslice_perseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("the limit of the number of slice number in one segment "
"for one query."),
@@ -8089,6 +8102,15 @@ static struct config_string ConfigureNamesString[] =
"", NULL, NULL
},
+ {
+ {"hawq_rm_stmt_vseg_memory", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the memory quota of one virtual segment for one statement."),
+ NULL
+ },
+ &rm_stmt_vseg_mem_str,
+ "128mb", assign_hawq_rm_stmt_vseg_memory, NULL
+ },
+
{
{"hawq_resourceenforcer_cgroup_mount_point", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("set cgroup mount point for resource enforcement"),
@@ -13531,4 +13553,37 @@ assign_gp_temporary_directory_mark_error(int newval, bool doit, GucSource source
return true;
}
+static const char *
+assign_hawq_rm_stmt_vseg_memory(const char *newval, bool doit, GucSource source)
+{
+ if (doit)
+ {
+ int32_t newvalmb = 0;
+ int parseres = FUNC_RETURN_OK;
+ SimpString valuestr;
+ setSimpleStringRef(&valuestr, newval, strlen(newval));
+ parseres = SimpleStringToStorageSizeMB(&valuestr, &newvalmb);
+
+ if ( parseres != FUNC_RETURN_OK )
+ {
+ return NULL; /* Not valid format of memory quota. */
+ }
+
+ if ( newvalmb == 64 ||
+ newvalmb == 128 ||
+ newvalmb == 256 ||
+ newvalmb == 512 ||
+ newvalmb == 1024 ||
+ newvalmb == 2048 ||
+ newvalmb == 4096 ||
+ newvalmb == 8192 ||
+ newvalmb == 16384 )
+ {
+ return newval;
+ }
+ return NULL; /* Not valid quota value. */
+ }
+ return newval;
+}
+
#include "guc-file.c"
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3a8dcd21/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index a9e0eef..1d4f7c7 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1171,6 +1171,9 @@ extern char *rm_grm_yarn_queue;
extern char *rm_grm_yarn_app_name;
extern int rm_grm_breath_return_percentage;
+extern char *rm_stmt_vseg_mem_str;
+extern int rm_stmt_nvseg;
+
extern int rm_nvseg_perquery_limit;
extern int rm_nvseg_perquery_perseg_limit;
extern int rm_nslice_perseg_limit;