You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/16 03:08:36 UTC
[2/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 44e5504..ef055c3 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -11,35 +11,41 @@
* The DDL statement attribute name strings.
*/
char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT]
- [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX] = {
+ [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX+1] = {
"parent",
"active_statements",
"memory_limit_cluster",
"core_limit_cluster",
- "vsegment_resource_quota",
+ "vseg_resource_quota",
"allocation_policy",
- "resource_upper_factor",
- "vsegment_upper_limit"
+ "resource_overcommit_factor",
+ "nvseg_upper_limit",
+ "nvseg_lower_limit",
+ "nvseg_upper_limit_perseg",
+ "nvseg_lower_limit_perseg"
};
/*
* The attribute names for expressing one complete resource queue definition.
*/
static char RSQTBLAttrNames[RSQ_TBL_ATTR_COUNT]
- [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX] = {
- "parent",
- "active_statements",
- "memory_limit_cluster",
- "core_limit_cluster",
- "vsegment_resource_quota",
- "allocation_policy",
- "resource_upper_factor",
- "vsegment_upper_limit",
+ [RESOURCE_QUEUE_TBL_COLNAME_LENGTH_MAX+1] = {
+ "parentoid",
+ "activestats",
+ "memorylimit",
+ "corelimit",
+ "vsegresourcequota",
+ "allocpolicy",
+ "resovercommit",
+ "nvsegupperlimit",
+ "nvseglowerlimit",
+ "nvsegupperlimitperseg",
+ "nvseglowerlimitperseg",
"oid",
"name",
- "creation_time",
- "update_time",
+ "creationtime",
+ "updatetime",
"status"
};
@@ -48,8 +54,7 @@ static char RSQTBLAttrNames[RSQ_TBL_ATTR_COUNT]
*/
static char RSQDDLValueAllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT]
[RESOURCE_QUEUE_DDL_POLICY_LENGTH_MAX] = {
- "even",
- "fifo"
+ "even"
};
/*
@@ -89,8 +94,7 @@ int computeQueryQuota_FIFO( DynResourceQueueTrack track,
int32_t min(int32_t a, int32_t b);
int32_t max(int32_t a, int32_t b);
computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
- computeQueryQuota_EVEN,
- computeQueryQuota_FIFO
+ computeQueryQuota_EVEN
};
int computeQueryQuota( DynResourceQueueTrack track,
@@ -111,8 +115,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track);
int dispatchResourceToQueries_FIFO(DynResourceQueueTrack track);
dispatchResourceToQueriesByPolicy DispatchPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
- dispatchResourceToQueries_EVEN,
- dispatchResourceToQueries_FIFO
+ dispatchResourceToQueries_EVEN
};
void dispatchResourceToQueriesInOneQueue(DynResourceQueueTrack track);
@@ -151,6 +154,9 @@ void markMemoryCoreRatioWaterMark(DQueue marks,
void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack,
uint32_t reason);
+bool isResourceAcceptable(ConnectionTrack conn, int segnumact);
+
+void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack);
/*----------------------------------------------------------------------------*/
/* RESOURCE QUEUE MANAGER EXTERNAL APIs */
/*----------------------------------------------------------------------------*/
@@ -269,7 +275,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
if ( attrindex == -1 )
{
snprintf(errorbuf, errorbufsize,
- "Not defined DDL attribute name [%s]",
+ "not defined DDL attribute name %s",
property->Key.Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return RMDDL_WRONG_ATTRNAME;
@@ -281,14 +287,14 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
{
/* Find oid of the parent resource queue. */
bool exist = false;
- DynResourceQueueTrack parentque = getQueueTrackByQueueName(
- property->Val.Str,
- property->Val.Len,
- &exist);
+ DynResourceQueueTrack parentque =
+ getQueueTrackByQueueName(property->Val.Str,
+ property->Val.Len,
+ &exist);
if ( !exist )
{
snprintf(errorbuf, errorbufsize,
- "Can not recognize parent resource queue name %s.",
+ "can not recognize parent resource queue name %s.",
property->Val.Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return RMDDL_WRONG_ATTRVALUE;
@@ -311,10 +317,13 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
case RSQ_DDL_ATTR_ACTIVE_STATMENTS:
case RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER:
case RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER:
- case RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA:
case RSQ_DDL_ATTR_ALLOCATION_POLICY:
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT:
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
{
/*
* Build property.
@@ -353,6 +362,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
*/
int parseResourceQueueAttributes( List *attributes,
DynResourceQueue queue,
+ bool checkformatonly,
char *errorbuf,
int errorbufsize)
{
@@ -372,8 +382,8 @@ int parseResourceQueueAttributes( List *attributes,
Assert( queue != NULL );
/* Initialize attributes. */
- queue->OID = -1;
- queue->ParentOID = -1;
+ queue->OID = InvalidOid;
+ queue->ParentOID = InvalidOid;
queue->ParallelCount = -1;
queue->ClusterMemoryMB = -1;
queue->Status = RESOURCE_QUEUE_STATUS_VALID_LEAF;
@@ -382,8 +392,11 @@ int parseResourceQueueAttributes( List *attributes,
queue->SegResourceQuotaVCore = -1.0;
queue->SegResourceQuotaMemoryMB = -1;
- queue->ResourceUpperFactor = -1;
- queue->VSegUpperLimit = DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT_N;
+ queue->ResourceOvercommit = DEFAULT_RESQUEUE_OVERCOMMIT_N;
+ queue->NVSegUpperLimit = DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT_N;
+ queue->NVSegLowerLimit = DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT_N;
+ queue->NVSegUpperLimitPerSeg = DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N;
+ queue->NVSegLowerLimitPerSeg = DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N;
queue->AllocatePolicy = -1;
queue->QueuingPolicy = -1;
@@ -394,7 +407,8 @@ int parseResourceQueueAttributes( List *attributes,
memset(queue->Name, '\0', sizeof(queue->Name));
- /* Go through each property content. */
+ /* Go through each attribute content. */
+ errorbuf[0] = '\0';
ListCell *cell = NULL;
foreach(cell, attributes)
{
@@ -414,7 +428,7 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_WRONG_ATTRNAME;
snprintf(errorbuf, errorbufsize,
- "Can not recognize resource queue attribute %s",
+ "of not recognized resource queue attribute %s",
attrname->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
@@ -437,17 +451,6 @@ int parseResourceQueueAttributes( List *attributes,
case RSQ_TBL_ATTR_ACTIVE_STATMENTS:
res = SimpleStringToInt32(attrvalue, &(queue->ParallelCount));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Active statements %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "active statements %d",
- queue->ParallelCount);
break;
case RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER:
@@ -483,98 +486,87 @@ int parseResourceQueueAttributes( List *attributes,
}
break;
- case RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
/* Decide it is a memory quota or core quota. */
- if ( SimpleStringStartWith(
- attrvalue,
- RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
+ if ( SimpleStringStartWith(attrvalue,
+ RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
{
SimpString valuestr;
- setSimpleStringRef(
- &valuestr,
- attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
- attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);
-
- res = SimpleStringToStorageSizeMB(
- &valuestr,
- &(queue->SegResourceQuotaMemoryMB));
+ setSimpleStringRef(&valuestr,
+ attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
+ attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
- "parsed segment resource quota %d MB",
- queue->SegResourceQuotaMemoryMB);
-
- }
- else if ( SimpleStringStartWith(
- attrvalue,
- RESOURCE_QUEUE_SEG_RES_QUOTA_CORE) == FUNC_RETURN_OK )
- {
- SimpString valuestr;
- setSimpleStringRef(
- &valuestr,
- attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_CORE)-1,
- attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_CORE)+1);
+ res = SimpleStringToStorageSizeMB(&valuestr,
+ &(queue->SegResourceQuotaMemoryMB));
- res = SimpleStringToDouble(&valuestr,
- &(queue->SegResourceQuotaVCore));
-
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
- "parsed segment resource quota %lf CORE",
- queue->SegResourceQuotaVCore);
+ /*
+ *--------------------------------------------------------------
+ * Check the value. We accept only :
+ * 64mb, 128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb
+ *--------------------------------------------------------------
+ */
+ if ( res == FUNC_RETURN_OK )
+ {
+ elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
+ "parsed segment resource quota %d MB",
+ queue->SegResourceQuotaMemoryMB);
+
+ if ( !(queue->SegResourceQuotaMemoryMB == (2<<6)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<7)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<8)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<9)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<10)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<11)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<12)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<13)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<14)) )
+ {
+ res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
+ snprintf(errorbuf, errorbufsize,
+ "%s value %s is not valid, only 64mb, 128mb, "
+ "256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb are "
+ "valid.",
+ RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA],
+ attrvalue->Str);
+ }
+ }
}
else
{
+ res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
snprintf(errorbuf, errorbufsize,
- "Resource quota limit %s is not valid.",
+ "%s format %s is not valid.",
+ RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA],
attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
}
break;
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- res = SimpleStringToDouble(attrvalue, &(queue->ResourceUpperFactor));
- if ( res != FUNC_RETURN_OK ) {
- snprintf(errorbuf, errorbufsize,
- "Resource upper factor %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "resource upper factor %lf",
- queue->ResourceUpperFactor);
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ res = SimpleStringToDouble(attrvalue, &(queue->ResourceOvercommit));
break;
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
- res = SimpleStringToInt32(attrvalue, &(queue->VSegUpperLimit));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Virtual segment upper limit %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "virtual segment upper limit %d",
- queue->VSegUpperLimit);
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegUpperLimit));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegLowerLimit));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
+ res = SimpleStringToDouble(attrvalue, &(queue->NVSegUpperLimitPerSeg));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
+ res = SimpleStringToDouble(attrvalue, &(queue->NVSegLowerLimitPerSeg));
break;
case RSQ_TBL_ATTR_ALLOCATION_POLICY:
- res = SimpleStringToMapIndexInt8(
- attrvalue,
- (char *)RSQDDLValueAllocationPolicy,
- RSQ_ALLOCATION_POLICY_COUNT,
- sizeof(RSQDDLValueAllocationPolicy[0]),
- &(queue->AllocatePolicy));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Allocation policy %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
+ res = SimpleStringToMapIndexInt8(attrvalue,
+ (char *)RSQDDLValueAllocationPolicy,
+ RSQ_ALLOCATION_POLICY_COUNT,
+ sizeof(RSQDDLValueAllocationPolicy[0]),
+ &(queue->AllocatePolicy));
break;
case RSQ_TBL_ATTR_NAME:
@@ -629,14 +621,22 @@ int parseResourceQueueAttributes( List *attributes,
if ( res != FUNC_RETURN_OK )
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(errorbuf, errorbufsize,
- "Wrong resource queue attribute setting. %s=%s",
- attrname->Str, attrvalue->Str);
+ if ( errorbuf[0] == '\0' )
+ {
+ snprintf(errorbuf, errorbufsize,
+ "wrong resource queue attribute setting. %s=%s",
+ attrname->Str, attrvalue->Str);
+ }
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
+ if ( checkformatonly )
+ {
+ return res;
+ }
+
/*
* Memory and Core resource must be specified and they must use the same way
* to express the resource.
@@ -645,7 +645,8 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "MEMORY_LIMIT_CLUSTER must be specified.");
+ "%s must be specified.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -654,7 +655,8 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "CORE_LIMIT_CLUSTER must be specified.");
+ "%s must be specified.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -664,8 +666,9 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_INCONSISTENT_RESOURCE_EXP;
snprintf(errorbuf, errorbufsize,
- "MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER "
- "must use the same way to express resource limit.");
+ "%s and %s must use the same way to express resource limit.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -703,24 +706,7 @@ int updateResourceQueueAttributes(List *attributes,
Assert(queue != NULL);
- /* We can not have duplicate property keys. */
ListCell *cell = NULL;
- foreach(cell, attributes)
- {
- KVProperty value1 = lfirst(cell);
-
- for ( ListCell *cell2 = lnext(cell) ; cell2 != NULL ; cell2 = lnext(cell2) )
- {
- KVProperty value2 = lfirst(cell2);
- if ( SimpleStringComp(&(value1->Key), value2->Key.Str) == 0 )
- {
- res = RESQUEMGR_DUPLICATE_ATTRNAME;
- snprintf(errorbuf, errorbufsize, "Duplicate attributes %s", value1->Key.Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- }
- }
/* Go through each property content. */
foreach(cell, attributes)
@@ -816,7 +802,7 @@ int updateResourceQueueAttributes(List *attributes,
}
break;
- case RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
/* Decide it is a memory quota or core quota. */
if ( SimpleStringStartWith(
attrvalue,
@@ -866,23 +852,23 @@ int updateResourceQueueAttributes(List *attributes,
}
break;
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- res = SimpleStringToDouble(attrvalue, &(queue->ResourceUpperFactor));
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ res = SimpleStringToDouble(attrvalue, &(queue->ResourceOvercommit));
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Resource upper limit factor %s is not valid.",
+ "Resource overcommit limit factor %s is not valid.",
attrvalue->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
elog(DEBUG3, "Resource manager updateResourceQueueAttributes() "
"updated Resource upper limit factor %lf",
- queue->ResourceUpperFactor);
+ queue->ResourceOvercommit);
break;
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
- res = SimpleStringToInt32(attrvalue, &(queue->VSegUpperLimit));
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegUpperLimit));
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
@@ -893,8 +879,8 @@ int updateResourceQueueAttributes(List *attributes,
}
elog(DEBUG3, "Resource manager updateResourceQueueAttributes() "
- "updated virtual segment upper limit %d",
- queue->VSegUpperLimit);
+ "updated virtual segment size upper limit %d",
+ queue->NVSegUpperLimit);
break;
case RSQ_TBL_ATTR_ALLOCATION_POLICY:
@@ -933,7 +919,7 @@ int updateResourceQueueAttributes(List *attributes,
{
res = RESQUEMGR_WRONG_ATTR;
snprintf(errorbuf, errorbufsize,
- "Wrong resource queue attribute setting. %s=%s",
+ "Wrong to parse resource queue attribute setting. %s=%s",
attrname->Str, attrvalue->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
@@ -980,20 +966,11 @@ int updateResourceQueueAttributes(List *attributes,
}
-/**
+/*
* This is one API for checking if new resource queue definition is valid to be
* created.This functions does not generate logs higher than WARNING, the error
* is also saved in error buffer to make the caller able to pass the message to
* remote process.
- *
- * queue[in/out] The queue instance to be tested and completed.
- * errorbuf[out] The error string buffer.
- * errorbufsize[in] The maximum size of error string buffer.
- *
- * Return values:
- * FUNC_RETURN_OK : Succeed.
- * RESQUEMGR_LACK_ATTR : Necessary attributes are not specified.
- * RESQUEMGR_WRONG_ATTR : Unrecognized wrong attribute value.
*/
int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
char *errorbuf,
@@ -1004,8 +981,8 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
Assert( queue != NULL );
- if ( queue->Status & RESOURCE_QUEUE_STATUS_IS_VER1X ) {
-
+ if ( queue->Status & RESOURCE_QUEUE_STATUS_IS_VER1X )
+ {
/* TODO: Validate Version 1.x resource queue definition here. */
return res;
}
@@ -1015,25 +992,29 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
/*
* STEP 1. Validate parent queue attribute.
*/
- if ( queue->ParentOID < 0 ) {
+ if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID == InvalidOid )
+ {
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "Attribute %s must be specified.",
+ "attribute %s must be specified.",
RSQDDLAttrNames[RSQ_DDL_ATTR_PARENT]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ParentOID != InvalidOid ) {
+ if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID != InvalidOid )
+ {
bool exist = false;
parenttrack = getQueueTrackByQueueOID(queue->ParentOID, &exist);
- Assert((exist && parenttrack != NULL) || !exist);
+ Assert(exist && parenttrack != NULL);
/* pg_default can not be a parent queue. */
- if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) ) {
+ if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_ATTR;
snprintf(errorbuf, errorbufsize,
- "pg_default can not have children resource queues.");
+ "%s can not have children resource queues.",
+ RESOURCE_QUEUE_DEFAULT_QUEUE_NAME);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1053,8 +1034,9 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* STEP 2. Validate active_statements attributes. For leaf queue only.
*/
- if ( queue->ParallelCount <= 0 ) {
- queue->ParallelCount = RESOURCE_QUEUE_PARALLEL_COUNT_DEF;
+ if ( queue->ParallelCount <= 0 )
+ {
+ queue->ParallelCount = DEFAULT_RESQUEUE_ACTIVESTATS_N;
}
/*
@@ -1067,22 +1049,22 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ( RESQUEUE_IS_PERCENT(queue) )
{
/* MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER must be specified.*/
- if ( queue->ClusterMemoryPer == -1 ) {
+ if ( queue->ClusterMemoryPer == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterVCorePer == -1 ) {
+ if ( queue->ClusterVCorePer == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1092,26 +1074,26 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* than 0, less than 100. This is to guarantee the following automatic
* deduction of the limits.
*/
- if ( queue->ClusterVCorePer <= 0 || queue->ClusterVCorePer > 100 ) {
+ if ( queue->ClusterVCorePer <= 0 || queue->ClusterVCorePer > 100 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be between 1%% and 100%%. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterVCorePer);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be between 1%% and 100%%. "
+ "Wrong value = %lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterVCorePer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterMemoryPer <= 0 || queue->ClusterMemoryPer > 100 ) {
+ if ( queue->ClusterMemoryPer <= 0 || queue->ClusterMemoryPer > 100 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be between 1%% and 100%%. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterMemoryPer);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be between 1%% and 100%%. "
+ "Wrong value = %lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1123,17 +1105,16 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ( queue->ClusterVCorePer != queue->ClusterMemoryPer )
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The value of %s must be identical with the value of %s. "
- "Wrong value of %s = %lf%%. "
- "Wrong value of %s = %lf%%. ",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryPer,
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterVCorePer);
+ snprintf(errorbuf, errorbufsize,
+ "The value of %s must be identical with the value of %s. "
+ "Wrong value of %s = %lf%%. "
+ "Wrong value of %s = %lf%%. ",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer,
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterVCorePer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1161,13 +1142,12 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ((current + queue->ClusterMemoryPer) > 100)
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The value of %s and %s exceeds its parent's limit. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryPer);
+ snprintf(errorbuf, errorbufsize,
+ "the value of %s and %s exceeds parent queue's limit. "
+ "Wrong value = %.0lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1180,22 +1160,22 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
else {
/* MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER must be specified.*/
- if ( queue->ClusterMemoryMB == -1 ) {
+ if ( queue->ClusterMemoryMB == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterVCore == -1 ) {
+ if ( queue->ClusterVCore == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1205,26 +1185,26 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* than 0. This is to guarantee the following automatic deduction of the
* limits.
*/
- if ( queue->ClusterVCore <= 0 ) {
+ if ( queue->ClusterVCore <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be greater than 0. "
- "Wrong value = %f",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterVCore);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be greater than 0. "
+ "Wrong value = %f",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterVCore);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterMemoryMB <= 0 ) {
+ if ( queue->ClusterMemoryMB <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be greater than 0. "
- "Wrong value = %dMB",
- RSQTBLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryMB);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be greater than 0. "
+ "Wrong value = %dMB",
+ RSQTBLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryMB);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1234,60 +1214,141 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* STEP 4: Check resource quota.
*/
if ( queue->SegResourceQuotaMemoryMB == -1 &&
- queue->SegResourceQuotaVCore == -1.0 ) {
- queue->SegResourceQuotaMemoryMB = RESOURCE_QUEUE_SEG_RES_QUOTA_DEF;
+ queue->SegResourceQuotaVCore == -1.0 )
+ {
+ queue->SegResourceQuotaMemoryMB = DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA_N;
}
- if ( queue->SegResourceQuotaMemoryMB != -1 ) {
+ if ( queue->SegResourceQuotaMemoryMB != -1 )
+ {
/* The quota value must be greater than 0. */
- if ( queue->SegResourceQuotaMemoryMB <= 0 ) {
+ if ( queue->SegResourceQuotaMemoryMB <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be greater than 0.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be greater than 0.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
- else if ( queue->SegResourceQuotaVCore != -1.0 ) {
-
+ else if ( queue->SegResourceQuotaVCore != -1.0 )
+ {
/* The quota value must be greater than 0. */
- if ( queue->SegResourceQuotaVCore <= 0.0 ) {
+ if ( queue->SegResourceQuotaVCore <= 0.0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be greater than 0.0.",
- RSQTBLAttrNames[RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be greater than 0.0.",
+ RSQTBLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
- else {
+ else
+ {
Assert(0); /* Should never come here. */
}
/*
* STEP 5: Check policy and set default value.
*/
- if ( queue->AllocatePolicy == -1 ) {
+ if ( queue->AllocatePolicy == -1 )
+ {
queue->AllocatePolicy = RSQ_ALLOCATION_POLICY_EVEN;
}
/*
- * STEP 6: Check resource factors.
+ * STEP 6: Check resource over-commit factor.
+ */
+ if ( queue->ResourceOvercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
+ MINIMUM_RESQUEUE_OVERCOMMIT_N,
+ queue->ResourceOvercommit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ /*
+ * STEP 7. Check number of vseg limit.
+ */
+ if ( queue->NVSegUpperLimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %d. Wrong value %d",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N,
+ queue->NVSegUpperLimit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegLowerLimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %d. Wrong value %d",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N,
+ queue->NVSegLowerLimit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegUpperLimit > 0 &&
+ queue->NVSegLowerLimit > 0 &&
+ queue->NVSegUpperLimit < queue->NVSegLowerLimit )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %s.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT]);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ /*
+ * STEP 8. Check number of vseg limit per segment.
*/
- if ( queue->ResourceUpperFactor == -1.0 ) {
- queue->ResourceUpperFactor = RESOURCE_QUEUE_RES_UPPER_FACTOR_DEF;
+ if ( queue->NVSegUpperLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N,
+ queue->NVSegUpperLimitPerSeg);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegLowerLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N,
+ queue->NVSegLowerLimitPerSeg);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
}
- if ( queue->ResourceUpperFactor < 1.0 ) {
+ if ( queue->NVSegUpperLimitPerSeg > 0 &&
+ queue->NVSegLowerLimitPerSeg > 0 &&
+ queue->NVSegUpperLimitPerSeg < queue->NVSegLowerLimitPerSeg )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be no less than 1.0. Wrong value %lf",
- RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR],
- queue->ResourceUpperFactor);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %s.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1303,13 +1364,6 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* is a new created instance.
* errorbuf[out] The error message if something is wrong.
* errorbufsize[out] The limit of error message buffer.
- *
- * Return FUNC_RETURN_OK : Everything is ok.
- * RESQUEMGR_DUPLICATE_QUEID : Duplicate resource queue id.
- * RESQUEMGR_NO_QUENAME : No resource queue name specified.
- * RESQUEMGR_DUPLICATE_QUENAME : Duplicate queue name.
- * RESQUEMGR_WRONG_PARENT_QUEUE : The parent queue is wrongly specified.
- *
*/
int createQueueAndTrack( DynResourceQueue queue,
DynResourceQueueTrack *track,
@@ -1381,7 +1435,7 @@ int createQueueAndTrack( DynResourceQueue queue,
isDefaultQueue = RESQUEUE_IS_DEFAULT(queue);
isRootQueue = RESQUEUE_IS_ROOT(queue);
- elog(RMLOG, "HAWQ RM :: To create resource queue instance %s", queue->Name);
+ elog(DEBUG3, "HAWQ RM :: To create resource queue instance %s", queue->Name);
/*
* Check the queue parent-child relationship. No matter the queue is to be
@@ -1389,12 +1443,15 @@ int createQueueAndTrack( DynResourceQueue queue,
* queue has no parent is 'pg_root' say isRootQueue. The queue 'pg_default'
* must has 'pg_root' as the parent queue.
*/
- if ( !isRootQueue ) {
+ if ( !isRootQueue )
+ {
/* Check if the parent queue id exists. */
parenttrack = getQueueTrackByQueueOID(queue->ParentOID, &exist);
- if (exist) {
+ if (exist)
+ {
/* Can not set pg_default as parent queue. */
- if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) ) {
+ if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1405,8 +1462,8 @@ int createQueueAndTrack( DynResourceQueue queue,
}
/* 'pg_default' must has 'pg_root' as parent. */
- if ( isDefaultQueue &&
- !RESQUEUE_IS_ROOT(parenttrack->QueueInfo) ) {
+ if ( isDefaultQueue && !RESQUEUE_IS_ROOT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1417,7 +1474,8 @@ int createQueueAndTrack( DynResourceQueue queue,
}
/* The parent queue can not have connections. */
- if ( parenttrack->CurConnCounter > 0 ) {
+ if ( parenttrack->CurConnCounter > 0 )
+ {
res = RESQUEMGR_IN_USE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1428,11 +1486,11 @@ int createQueueAndTrack( DynResourceQueue queue,
goto exit;
}
}
- else {
+ else
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf(errorbuf, errorbufsize,
- ERRORPOS_FORMAT "No expected parent queue " INT64_FORMAT,
- ERRREPORTPOS,
+ "No expected parent queue " INT64_FORMAT,
queue->ParentOID);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
goto exit;
@@ -1443,8 +1501,8 @@ int createQueueAndTrack( DynResourceQueue queue,
* core ratio related information should also be updated.
*/
if ( RESQUEUE_IS_LEAF(parenttrack->QueueInfo) &&
- parenttrack->trackedMemCoreRatio ) {
-
+ parenttrack->trackedMemCoreRatio )
+ {
/* Remove parent track from memory core ratio track */
removeResourceQueueRatio(parenttrack);
@@ -1454,8 +1512,6 @@ int createQueueAndTrack( DynResourceQueue queue,
}
}
-
-
/* Set parent resource queue track reference. */
newqueuetrack->ParentTrack = parenttrack;
@@ -1481,16 +1537,17 @@ int createQueueAndTrack( DynResourceQueue queue,
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PQUEMGR->Queues = lappend(PQUEMGR->Queues, newqueuetrack);
MEMORY_CONTEXT_SWITCH_BACK
- if( newqueuetrack->QueueInfo->OID != InvalidOid ) {
+ if( newqueuetrack->QueueInfo->OID != InvalidOid )
+ {
setQueueTrackIndexedByQueueOID(newqueuetrack);
}
setQueueTrackIndexedByQueueName(newqueuetrack);
/* Update overall ratio index. */
- if ( !RESQUEUE_IS_PERCENT(newqueuetrack->QueueInfo) ) {
- newqueuetrack->MemCoreRatio =
- trunc(newqueuetrack->QueueInfo->ClusterMemoryMB /
- newqueuetrack->QueueInfo->ClusterVCore);
+ if ( !RESQUEUE_IS_PERCENT(newqueuetrack->QueueInfo) )
+ {
+ newqueuetrack->MemCoreRatio = trunc(newqueuetrack->QueueInfo->ClusterMemoryMB /
+ newqueuetrack->QueueInfo->ClusterVCore);
addResourceQueueRatio(newqueuetrack);
}
@@ -1498,7 +1555,8 @@ int createQueueAndTrack( DynResourceQueue queue,
*track = newqueuetrack;
exit:
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
/* Free resource queue track instance. */
freeDynResourceQueueTrack(newqueuetrack);
*track = NULL;
@@ -1567,15 +1625,14 @@ int dropQueueAndTrack( DynResourceQueueTrack track,
}
-DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid,
- bool *exist)
+DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid, bool *exist)
{
PAIR pair = NULL;
SimpArray key;
setSimpleArrayRef(&key, (char *)&queoid, sizeof(int64_t));
- pair = getHASHTABLENode(&(PQUEMGR->QueuesIDIndex),
- (void *)&key);
- if ( pair == NULL ) {
+ pair = getHASHTABLENode(&(PQUEMGR->QueuesIDIndex), (void *)&key);
+ if ( pair == NULL )
+ {
*exist = false;
return NULL;
}
@@ -1997,15 +2054,21 @@ exit:
/**
* Return one connection to resource queue.
*/
-void returnConnectionToQueue(ConnectionTrack conntrack, bool normally)
+void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout)
{
DynResourceQueueTrack track = (DynResourceQueueTrack)(conntrack->QueueTrack);
- if ( normally )
+ if ( !istimeout )
{
transformConnectionTrackProgress(conntrack, CONN_PP_ESTABLISHED);
}
+ else
+ {
+ transformConnectionTrackProgress(conntrack, CONN_PP_TIMEOUT_FAIL);
+ }
+
track->CurConnCounter--;
- if ( track->CurConnCounter == 0 ) {
+ if ( track->CurConnCounter == 0 )
+ {
track->isBusy = false;
refreshMemoryCoreRatioLimits();
refreshMemoryCoreRatioWaterMark();
@@ -2045,7 +2108,8 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
- if ( queuetrack->ClusterSegNumberMax == 0 ) {
+ if ( queuetrack->ClusterSegNumberMax == 0 )
+ {
elog(LOG, "The queue %s has no resource available to run queries.",
queuetrack->QueueInfo->Name);
return RESQUEMGR_NO_RESOURCE;
@@ -2061,7 +2125,8 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
&(conntrack->SegNumMin),
conntrack->VSegLimit);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
int32_t Rmax = conntrack->SegNum;
int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
@@ -2074,14 +2139,6 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
int32_t Gmax= conntrack->MaxSegCountFixed;
int32_t Gmin= conntrack->MinSegCountFixed;
- /* Apply upper vseg limit. */
- if ( conntrack->MaxSegCountFixed > queuetrack->QueueInfo->VSegUpperLimit &&
- conntrack->MinSegCountFixed <= queuetrack->QueueInfo->VSegUpperLimit )
- {
- Gmax = queuetrack->QueueInfo->VSegUpperLimit;
- elog(LOG, "Maximum vseg num is limited to %d", Gmax);
- }
-
if(Gmin==1)
{
/* case 1 */
@@ -2113,13 +2170,20 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
}
}
- elog(LOG, "HAWQ RM :: Expect (%d MB, %lf CORE) x %d ( min %d ) resource.",
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
conntrack->SegMemoryMB,
conntrack->SegCore,
conntrack->SegNum,
conntrack->SegNumMin);
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
/* Add request to the resource queue and return. */
res = addQueryResourceRequestToQueue(queuetrack, conntrack);
@@ -2144,11 +2208,13 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
userinfo = getUserByUserName(conntrack->UserID,
strlen(conntrack->UserID),
&exist);
- if ( exist ) {
+ if ( exist )
+ {
/* Get the queue, and check if the parallel limit is achieved. */
queuetrack = getQueueTrackByQueueOID(userinfo->QueueOID, &exist);
}
- else {
+ else
+ {
elog(LOG, "No user %s defined for registering connection. Assign to "
"default queue.",
conntrack->UserID);
@@ -2156,7 +2222,8 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
userinfo = NULL;
}
- if ( queuetrack == NULL ) {
+ if ( queuetrack == NULL )
+ {
elog(LOG, "Resource manager fails to find target resource queue for user %s.",
conntrack->UserID);
res = RESQUEMGR_NO_ASSIGNEDQUEUE;
@@ -2173,7 +2240,8 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
&(conntrack->SegNumMin),
conntrack->VSegLimit);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
int32_t Rmax = conntrack->SegNum;
int32_t RmaxL =conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
@@ -2186,14 +2254,6 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
int32_t Gmax= conntrack->MaxSegCountFixed;
int32_t Gmin= conntrack->MinSegCountFixed;
- /* Apply upper vseg limit. */
- if ( conntrack->MaxSegCountFixed > queuetrack->QueueInfo->VSegUpperLimit &&
- conntrack->MinSegCountFixed <= queuetrack->QueueInfo->VSegUpperLimit )
- {
- Gmax = queuetrack->QueueInfo->VSegUpperLimit;
- elog(LOG, "Maximum vseg num is limited to %d", Gmax);
- }
-
if(Gmin==1)
{
/* case 1 */
@@ -2230,14 +2290,79 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
conntrack->SegCore,
conntrack->SegNum,
conntrack->SegNumMin);
+
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
+
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
}
- else {
+ else
+ {
elog(LOG, "Not accepted resource acquiring request.");
}
exit:
return res;
}
+void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack)
+{
+ DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
+
+ if ( queuetrack == NULL )
+ {
+ elog(WARNING, "Detected connection track without assigned queue. ConnID %d",
+ conntrack->ConnID);
+ return;
+ }
+
+ if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N ||
+ queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+ {
+ if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N &&
+ queuetrack->QueueInfo->NVSegLowerLimit > conntrack->SegNumMin &&
+ queuetrack->QueueInfo->NVSegLowerLimit <= conntrack->SegNum )
+ {
+ conntrack->SegNumMin = queuetrack->QueueInfo->NVSegLowerLimit;
+ }
+
+ if ( queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N &&
+ queuetrack->QueueInfo->NVSegUpperLimit >= conntrack->SegNumMin &&
+ queuetrack->QueueInfo->NVSegUpperLimit < conntrack->SegNum )
+ {
+ conntrack->SegNum = queuetrack->QueueInfo->NVSegUpperLimit;
+ }
+ }
+ else if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg > 0 ||
+ queuetrack->QueueInfo->NVSegUpperLimitPerSeg > 0 )
+ {
+ if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg >
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
+ {
+ int minnvseg = ceil(queuetrack->QueueInfo->NVSegLowerLimitPerSeg *
+ PRESPOOL->AvailNodeCount);
+ if ( minnvseg > conntrack->SegNumMin && minnvseg <= conntrack->SegNum)
+ {
+ conntrack->SegNumMin = minnvseg;
+ }
+ }
+
+ if ( queuetrack->QueueInfo->NVSegUpperLimitPerSeg >
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
+ {
+ int maxnvseg = ceil(queuetrack->QueueInfo->NVSegUpperLimitPerSeg *
+ PRESPOOL->AvailNodeCount);
+ if ( maxnvseg >= conntrack->SegNumMin && maxnvseg < conntrack->SegNum)
+ {
+ conntrack->SegNum = maxnvseg;
+ }
+ }
+ }
+}
+
/* Resource is returned from query to resource queue. */
int returnResourceToResQueMgr(ConnectionTrack conntrack)
{
@@ -2650,11 +2775,9 @@ int checkUserAttributes( UserInfo user, char *errorbuf, int errorbufsize)
return FUNC_RETURN_OK;
}
-/* Create one user */
-int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize)
+/* Create one user. Expect always no error. */
+void createUser(UserInfo userinfo)
{
- int res = FUNC_RETURN_OK;
-
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PQUEMGR->Users = lappend(PQUEMGR->Users, userinfo);
MEMORY_CONTEXT_SWITCH_BACK
@@ -2664,8 +2787,6 @@ int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize)
setUserIndexedByUserOID(userinfo);
}
setUserIndexedByUserName(userinfo);
-
- return res;
}
void setUserIndexedByUserOID(UserInfo userinfo)
@@ -3114,8 +3235,10 @@ int getRSQTBLAttributeNameIndex(SimpStringPtr attrname)
int getRSQDDLAttributeNameIndex(SimpStringPtr attrname)
{
- for ( int i = 0 ; i < RSQ_DDL_ATTR_COUNT ; ++i ) {
- if ( SimpleStringComp(attrname, RSQDDLAttrNames[i]) == 0 ) {
+ for ( int i = 0 ; i < RSQ_DDL_ATTR_COUNT ; ++i )
+ {
+ if ( SimpleStringComp(attrname, RSQDDLAttrNames[i]) == 0 )
+ {
return i;
}
}
@@ -3354,12 +3477,12 @@ void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
}
track->ClusterMemoryMaxPer = track->ClusterMemoryActPer *
- track->QueueInfo->ResourceUpperFactor;
+ track->QueueInfo->ResourceOvercommit;
track->ClusterMemoryMaxPer = track->ClusterMemoryMaxPer > 100 ?
100.0 :
track->ClusterMemoryMaxPer;
track->ClusterVCoreMaxPer = track->ClusterVCoreActPer *
- track->QueueInfo->ResourceUpperFactor;
+ track->QueueInfo->ResourceOvercommit;
track->ClusterVCoreMaxPer = track->ClusterVCoreMaxPer > 100 ?
100.0 :
track->ClusterVCoreMaxPer;
@@ -3475,7 +3598,7 @@ void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
track->QueueInfo->SegResourceQuotaVCore,
track->ClusterSegNumber,
track->ClusterSegNumberMax,
- track->QueueInfo->ResourceUpperFactor);
+ track->QueueInfo->ResourceOvercommit);
}
}
@@ -3818,7 +3941,8 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
&(conn->Resource),
&segnumact,
&(conn->SegIOBytes));
- if ( segnumact >= conn->SegNumMin )
+
+ if ( isResourceAcceptable(conn, segnumact) )
{
elog(DEBUG3, "Resource manager dispatched %d segment(s) to connection %d",
segnumact,
@@ -3905,6 +4029,65 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
return FUNC_RETURN_OK;
}
+
+bool isResourceAcceptable(ConnectionTrack conn, int segnumact)
+{
+ /* Enough number of vsegments. */
+ if ( segnumact < conn->SegNumMin )
+ {
+ return false;
+ }
+
+ /*
+ *--------------------------------------------------------------------------
+ * Spread wide enough. If there is at least one segment containing 2 or more
+ * vsegments, the number of segments should be not be too small, i.e. the
+ * vsegments should not be assigned in a few segments.
+ *--------------------------------------------------------------------------
+ */
+ if ( segnumact > list_length(conn->Resource) )
+ {
+ if ( PRESPOOL->SlavesHostCount - rm_tolerate_nseg_limit >
+ list_length(conn->Resource) )
+ {
+ elog(WARNING, "Find virtual segments are dispatched to %d segments, "
+ "less than %d",
+ segnumact,
+ list_length(conn->Resource));
+ return false;
+ }
+ }
+
+ /*
+ *--------------------------------------------------------------------------
+ * Spread even enough. If the size of vsegments in each segment varies too
+ * much, the allocation result is not accepted.
+ *--------------------------------------------------------------------------
+ */
+ if ( segnumact > list_length(conn->Resource) )
+ {
+ int minval = segnumact;
+ int maxval = 0;
+ ListCell *cell = NULL;
+ foreach(cell, conn->Resource)
+ {
+ VSegmentCounterInternal vsegcnt = lfirst(cell);
+ minval = minval < vsegcnt->VSegmentCount ? minval : vsegcnt->VSegmentCount;
+ maxval = maxval > vsegcnt->VSegmentCount ? maxval : vsegcnt->VSegmentCount;
+ }
+ if ( rm_nvseg_variance_among_seg_limit < maxval - minval )
+ {
+ elog(WARNING, "Find virtual segments are not evenly dispatched to segments, "
+ "maximum virtual segment size is %d, "
+ "minimum virtual segment size is %d.",
+ maxval,
+ minval);
+ return false;
+ }
+ }
+ return true;
+}
+
int dispatchResourceToQueries_FIFO(DynResourceQueueTrack track)
{
return FUNC_RETURN_OK;
@@ -4100,13 +4283,13 @@ void timeoutDeadResourceAllocation(void)
curcon->ConnID);
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
elog(LOG, "The allocated resource timeout is detected. "
"ConnID %d",
curcon->ConnID);
returnResourceToResQueMgr(curcon);
- returnConnectionToQueue(curcon, false);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
@@ -4119,13 +4302,13 @@ void timeoutDeadResourceAllocation(void)
case CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT:
{
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
elog(LOG, "The queued resource request timeout is detected. "
"ConnID %d",
curcon->ConnID);
cancelResourceAllocRequest(curcon);
- returnConnectionToQueue(curcon, false);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
@@ -4138,12 +4321,12 @@ void timeoutDeadResourceAllocation(void)
case CONN_PP_REGISTER_DONE:
{
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
- elog(LOG, "The registered connection timeout is detected. "
- "ConnID %d",
- curcon->ConnID);
- returnConnectionToQueue(curcon, false);
+ elog(WARNING, "The registered connection timeout is detected. "
+ "ConnID %d",
+ curcon->ConnID);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/utils/network_utils.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/network_utils.c b/src/backend/resourcemanager/utils/network_utils.c
index 51f1294..462e085 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -1,10 +1,7 @@
#include "utils/network_utils.h"
#include "utils/memutilities.h"
#include "miscadmin.h"
-
-#ifdef GETIFADDRS_USING_SIGAR
#include "sigar.h"
-#endif
#include <arpa/inet.h>
#include <sys/socket.h>
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/tcop/utility.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 19fd47a..986afb3 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1646,18 +1646,10 @@ ProcessUtility(Node *parsetree,
case T_AlterQueueStmt:
/* if guc variable not set, or bootstrap mode, or utility mode connection, throw exception*/
- /*if (!(IsBootstrapProcessingMode() || (Gp_role == GP_ROLE_UTILITY)
- || gp_called_by_pgdump))
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support alter resource queue statement yet") ));
- }*/
alterResourceQueue((AlterQueueStmt *) parsetree);
- //AlterQueue((AlterQueueStmt *) parsetree);
break;
case T_DropQueueStmt:
- //DropQueue((DropQueueStmt *) parsetree);
dropResourceQueue((DropQueueStmt *) parsetree);
break;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/etc/slaves.exclude
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/slaves.exclude b/src/backend/utils/misc/etc/slaves.exclude
deleted file mode 100644
index 7338e8b..0000000
--- a/src/backend/utils/misc/etc/slaves.exclude
+++ /dev/null
@@ -1 +0,0 @@
-jiny2
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/etc/yarn-client.xml
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/yarn-client.xml b/src/backend/utils/misc/etc/yarn-client.xml
new file mode 100644
index 0000000..b401bdf
--- /dev/null
+++ b/src/backend/utils/misc/etc/yarn-client.xml
@@ -0,0 +1,288 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<configuration>
+
+<!-- KDC
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>kerberos</value>
+ </property>
+KDC -->
+
+<!-- HA
+ <property>
+ <name>dfs.nameservices</name>
+ <value>phdcluster</value>
+ </property>
+
+ <property>
+ <name>dfs.ha.namenodes.phdcluster</name>
+ <value>nn1,nn2</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.phdcluster.nn1</name>
+ <value>mdw:9000</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.phdcluster.nn2</name>
+ <value>smdw:9000</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.phdcluster.nn1</name>
+ <value>mdw:50070</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.phdcluster.nn2</name>
+ <value>smdw:50070</value>
+ </property>
+
+HA -->
+
+ <!-- RPC client configuration -->
+ <property>
+ <name>rpc.client.timeout</name>
+ <value>3600000</value>
+ <description>
+ timeout interval of a RPC invocation in millisecond. default is 3600000.
+ </description>
+ </property>
+ <property>
+ <name>rpc.client.connect.tcpnodelay</name>
+ <value>true</value>
+ <description>
+ whether set socket TCP_NODELAY to true when connect to RPC server. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.max.idle</name>
+ <value>10000</value>
+ <description>
+ the max idle time of a RPC connection in millisecond. default is 10000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.ping.interval</name>
+ <value>10000</value>
+ <description>
+ the interval which the RPC client send a heart beat to server. 0 means disable, default is 10000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to setup the connection. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.connect.retry</name>
+ <value>10</value>
+ <description>
+ the max retry times if the RPC client fail to setup the connection to server. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to read from server. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to write to server. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.socekt.linger.timeout</name>
+ <value>-1</value>
+ <description>
+ set value to socket SO_LINGER when connect to RPC server. -1 means default OS value. default is -1.
+ </description>
+ </property>
+
+ <!-- dfs client configuration -->
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ <description>
+ whether reading block file bypass datanode if the block and the client are on the same node. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.default.replica</name>
+ <value>3</value>
+ <description>
+ the default number of replica. default is 3.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.prefetchsize</name>
+ <value>10</value>
+ <description>
+ the default number of blocks which information will be prefetched. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.client.failover.max.attempts</name>
+ <value>15</value>
+ <description>
+ if multiply namenodes are configured, it is the max retry times when the dfs client try to issue a RPC call. default is 15.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.default.blocksize</name>
+ <value>134217728</value>
+ <description>
+ default block size. default is 134217728.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.client.log.severity</name>
+ <value>INFO</value>
+ <description>
+ the minimal log severity level, valid values include FATAL, ERROR, INFO, DEBUG1, DEBUG2, DEBUG3. default is INFO.
+ </description>
+ </property>
+
+ <!-- input client configuration -->
+ <property>
+ <name>input.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to setup the connection to datanode. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to read from datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to write to datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.localread.default.buffersize</name>
+ <value>2097152</value>
+ <description>
+ number of bytes of the buffer which is used to hold the data from block file and verify checksum.
+ it is only used when "dfs.client.read.shortcircuit" is set to true. default is 1048576.
+ </description>
+ </property>
+
+ <property>
+ <name>input.localread.blockinfo.cachesize</name>
+ <value>1000</value>
+ <description>
+ the size of block file path information cache. default is 1000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.read.getblockinfo.retry</name>
+ <value>3</value>
+ <description>
+ the max retry times when the client fail to get block information from namenode. default is 3.
+ </description>
+ </property>
+
+ <!-- output client configuration -->
+ <property>
+ <name>output.replace-datanode-on-failure</name>
+ <value>false</value>
+ <description>
+ whether the client add new datanode into pipeline if the number of nodes in pipeline is less the specified number of replicas. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.chunksize</name>
+ <value>512</value>
+ <description>
+ the number of bytes of a chunk in pipeline. default is 512.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.packetsize</name>
+ <value>65536</value>
+ <description>
+ the number of bytes of a packet in pipeline. default is 65536.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.write.retry</name>
+ <value>10</value>
+ <description>
+ the max retry times when the client fail to setup the pipeline. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>output.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to setup the connection to datanode. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to read from datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to write to datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.packetpool.size</name>
+ <value>1024</value>
+ <description>
+ the max number of packets in a file's packet pool. default is 1024.
+ </description>
+ </property>
+
+ <property>
+ <name>output.close.timeout</name>
+ <value>900000</value>
+ <description>
+ the timeout interval in millisecond when close an output stream. default is 900000.
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 1e91d56..5a2c90a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4402,15 +4402,6 @@ static struct config_bool ConfigureNamesBool[] =
},
{
- {"hawq_rm_domain_comm_enable", PGC_USERSET, DEVELOPER_OPTIONS,
- gettext_noop("Indicate whether domain socket for RM communication is allowed"),
- NULL,
- },
- &rm_domain_comm_enable,
- false, NULL, NULL
- },
-
- {
{"hawq_resourceenforcer_cpu_enable", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("enable enforcing cpu resource consumption."),
NULL
@@ -4447,6 +4438,15 @@ static struct config_bool ConfigureNamesBool[] =
true, NULL, NULL
},
+ {
+ {"hawq_rm_session_lease_heartbeat_enable", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("enable or disable session lease heartbeat for test."),
+ NULL
+ },
+ &rm_session_lease_heartbeat_enable,
+ true, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
@@ -6294,32 +6294,32 @@ static struct config_int ConfigureNamesInt[] =
1, 1, 65535, NULL, NULL
},
- {
- {"hawq_resourcemanager_master_address_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("master resource manager server address port number"),
- NULL
- },
- &rm_master_addr_port,
- 5437, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_master_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager master server port number"),
+ NULL
+ },
+ &rm_master_port,
+ 5437, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_master_address_domainsocket_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("master resource manager server address domain socket port number"),
- NULL
- },
- &rm_master_addr_domain_port,
- 5436, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_segment_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager segment server port number"),
+ NULL
+ },
+ &rm_segment_port,
+ 5438, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_segment_address_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("segment resource manager server address port number"),
- NULL
- },
- &rm_seg_addr_port,
- 5438, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_master_domain_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager master domain socket port number"),
+ NULL
+ },
+ &rm_master_domain_port,
+ 5436, 1, 65535, NULL, NULL
+ },
{
{"hawq_resourcemanager_log_level", PGC_USERSET, DEVELOPER_OPTIONS,
@@ -6391,32 +6391,34 @@ static struct config_int ConfigureNamesInt[] =
0, 0, 10, NULL, NULL
},
- {
- {"hawq_resourcemanager_query_vsegment_number_limit", PGC_USERSET, RESOURCES_MGM,
- gettext_noop("the limit of the number of virtual segments for one query."),
- NULL
- },
- &rm_query_vseg_num_limit,
- 1000, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nvseg_perquery_limit", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the limit of the number of virtual segments for one query."),
+ NULL
+ },
+ &rm_nvseg_perquery_limit,
+ 1000, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_query_vsegment_number_per_segment_limit", PGC_USERSET, RESOURCES_MGM,
- gettext_noop("the limit of the number of virtual segments in one segment for one query."),
- NULL
- },
- &rm_query_vseg_num_per_seg_limit,
- 8, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nvseg_perquery_perseg_limit", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the limit of the number of virtual segments in one "
+ "segment for one query."),
+ NULL
+ },
+ &rm_nvseg_perquery_perseg_limit,
+ 8, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_segment_slice_number_limit", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("the limit of the number of slice number in one segment for one query."),
- NULL
- },
- &rm_slice_num_per_seg_limit,
- 3000, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nslice_perseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the limit of the number of slice number in one segment "
+ "for one query."),
+ NULL
+ },
+ &rm_nslice_perseg_limit,
+ 3000, 1, 65535, NULL, NULL
+ },
{
{"hawq_resourcemanager_segment_container_waterlevel", PGC_POSTMASTER, RESOURCES_MGM,
@@ -6430,12 +6432,13 @@ static struct config_int ConfigureNamesInt[] =
},
{
- {"hawq_resourcemanager_resource_noacition_timeout", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("timeout for closing active connection for resource negotiation."),
+ {"hawq_rm_session_lease_timeout", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("timeout for closing a session lease if dispatcher does "
+ "not send heart-beat for a while."),
NULL
},
- &rm_resource_noaction_timeout,
- 10, -1, 65535, NULL, NULL
+ &rm_session_lease_timeout,
+ 10, 5, 65535, NULL, NULL
},
{
@@ -6466,6 +6469,24 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"hawq_rm_tolerate_nseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the size of down segments that resource manager should tolerate at most ."),
+ NULL
+ },
+ &rm_tolerate_nseg_limit,
+ 2, 0, 65535, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_nvseg_variance_amon_seg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the variance of vseg number in each segment that resource manager should tolerate at most."),
+ NULL
+ },
+ &rm_nvseg_variance_among_seg_limit,
+ 1, 0, 65535, NULL, NULL
+ },
+
+ {
{"hawq_reourcemanager_max_resourcequeue_number", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("the maximum number of resource queue."),
NULL
@@ -8003,14 +8024,14 @@ static struct config_string ConfigureNamesString[] =
"", NULL, NULL
},
- {
- {"hawq_resourcemanager_server_type", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("set resource management server type"),
- NULL
+ {
+ {"hawq_global_rm_type", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("set resource management server type"),
+ NULL
},
- &rm_grm_server_type,
+ &rm_global_rm_type,
"none", NULL, NULL
- },
+ },
{
{"hawq_resourcemanager_yarn_resourcemanager_address", PGC_POSTMASTER, RESOURCES_MGM,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/bin/pg_dump/pg_dumpall.c
----------------------------------------------------------------------
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 1686936..fdbbd46 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -589,7 +589,7 @@ dumpResQueues(PGconn *conn)
"rsq_allocation_policy::text as ressetting, "
"6 as ord FROM pg_resqueue "
"UNION "
- "SELECT rsqname, 'vsegment_resource_quota' as resname, "
+ "SELECT rsqname, 'vseg_resource_quota' as resname, "
"rsq_vseg_resource_quota::text as ressetting, "
"7 as ord FROM pg_resqueue "
"UNION "
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/indexing.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 41d7e40..73a69d1 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -377,42 +377,22 @@ DECLARE_UNIQUE_INDEX(pg_filesystem_fsysname_index, 7184, on pg_filesystem using
/* relation id: 7076 - pg_remote_credentials 20140205 */
DECLARE_UNIQUE_INDEX(pg_remote_credentials_owner_service_index, 7081, on pg_remote_credentials using btree(rcowner oid_ops, rcservice text_ops));
#define RemoteCredentialsOwnerServiceIndexId 7081
-/* relation id: 6026 - pg_resqueue 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueue_oid_index, 6027, on pg_resqueue using btree(oid oid_ops));
-#define ResQueueOidIndexId 6027
-/* relation id: 6026 - pg_resqueue 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueue_rsqname_index, 6028, on pg_resqueue using btree(rsqname name_ops));
-#define ResQueueRsqnameIndexId 6028
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_oid_index, 6061, on pg_resourcetype using btree(oid oid_ops));
-#define ResourceTypeOidIndexId 6061
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_restypid_index, 6062, on pg_resourcetype using btree(restypid int2_ops));
-#define ResourceTypeRestypidIndexId 6062
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_resname_index, 6063, on pg_resourcetype using btree(resname name_ops));
-#define ResourceTypeResnameIndexId 6063
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueuecapability_oid_index, 6064, on pg_resqueuecapability using btree(oid oid_ops));
-#define ResQueueCapabilityOidIndexId 6064
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_INDEX(pg_resqueuecapability_resqueueid_index, 6065, on pg_resqueuecapability using btree(resqueueid oid_ops));
-#define ResQueueCapabilityResqueueidIndexId 6065
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_INDEX(pg_resqueuecapability_restypid_index, 6066, on pg_resqueuecapability using btree(restypid int2_ops));
-#define ResQueueCapabilityRestypidIndexId 6066
/* relation id: 5036 - gp_segment_configuration 20150207 */
DECLARE_UNIQUE_INDEX(gp_segment_config_registration_order_index, 6106, on gp_segment_configuration using btree(registration_order int4_ops));
#define GpSegmentConfigRegistration_orderIndexId 6106
-
/* relation id: 5036 - gp_segment_configuration 20150207 */
DECLARE_INDEX(gp_segment_config_role_index, 6107, on gp_segment_configuration using btree(role char_ops));
#define GpSegmentConfigRoleIndexId 6107
-
/* relation id: 6119 - pg_attribute_encoding 20141112 */
DECLARE_INDEX(pg_attribute_attrelid_index, 6119, on pg_attribute using btree(attrelid oid_ops));
#define AttributeAttrelidIndexId 6119
+/* relation id: 6026 - pg_resqueue 20151014 */
+DECLARE_UNIQUE_INDEX(pg_resqueue_oid_index, 6027, on pg_resqueue using btree(oid oid_ops));
+#define ResQueueOidIndexId 6027
+/* relation id: 6026 - pg_resqueue 20151014 */
+DECLARE_UNIQUE_INDEX(pg_resqueue_rsqname_index, 6028, on pg_resqueue using btree(rsqname name_ops));
+#define ResQueueRsqnameIndexId 6028
/* TIDYCAT_END_CODEGEN */