You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/16 03:08:37 UTC
[3/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/requesthandler_ddl.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_ddl.c b/src/backend/resourcemanager/requesthandler_ddl.c
index df038ea..9005a22 100644
--- a/src/backend/resourcemanager/requesthandler_ddl.c
+++ b/src/backend/resourcemanager/requesthandler_ddl.c
@@ -45,18 +45,21 @@ void freeUpdateActionList(MCTYPE context, List **actions);
* mapping with the definition of table pg_resqueue in pg_resqueue.h
*/
const char* PG_Resqueue_Column_Names[Natts_pg_resqueue] = {
- "rsqname",
- "rsq_parent",
- "rsq_active_stats_cluster",
- "rsq_memory_limit_cluster",
- "rsq_core_limit_cluster",
- "rsq_resource_upper_factor",
- "rsq_allocation_policy",
- "rsq_vseg_resource_quota",
- "rsq_vseg_upper_limit",
- "rsq_creation_time",
- "rsq_update_time",
- "rsq_status"
+ "name",
+ "parentoid",
+ "activestats",
+ "memorylimit",
+ "corelimit",
+ "resovercommit",
+ "allocpolicy",
+ "vsegresourcequota",
+ "nvsegupperlimit",
+ "nvseglowerlimit",
+ "nvsegupperlimitperseg",
+ "nvseglowerlimitperseg",
+ "creationtime",
+ "updatetime",
+ "status"
};
/**
@@ -72,7 +75,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
DynResourceQueueTrack todroptrack = NULL;
DynResourceQueueTrack toupdatetrack = NULL;
SelfMaintainBufferData responsebuff;
- char errorbuf[1024] = "";
+ static char errorbuf[1024] = "";
bool exist = false;
List *fineattr = NULL;
List *rsqattr = NULL;
@@ -80,8 +83,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
DynResourceQueue oldqueue = NULL;
/* Check context and retrieve the connection track based on connection id.*/
- RPCRequestHeadManipulateResQueue request =
- (RPCRequestHeadManipulateResQueue)((*conntrack)->MessageBuff.Buffer);
+ RPCRequestHeadManipulateResQueue request = (RPCRequestHeadManipulateResQueue)
+ ((*conntrack)->MessageBuff.Buffer);
elog(LOG, "Resource manager gets a request from ConnID %d to submit resource "
"queue DDL statement.",
@@ -155,8 +158,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
foreach(cell, rsqattr)
{
KVProperty attribute = lfirst(cell);
- elog(LOG, "Resource manager received DDL Request: %s=%s",
- attribute->Key.Str, attribute->Val.Str);
+ elog(DEBUG3, "Resource manager received DDL Request: %s=%s",
+ attribute->Key.Str, attribute->Val.Str);
}
/* Shallow parse the 'withlist' attributes. */
@@ -167,10 +170,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
if (res != FUNC_RETURN_OK)
{
ddlres = res;
- elog(WARNING, ERRORPOS_FORMAT
- "Can not recognize DDL attribute because %s",
- ERRREPORTPOS,
- errorbuf);
+ elog(WARNING, "Can not recognize DDL attribute because %s", errorbuf);
goto senderr;
}
@@ -202,6 +202,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
newqueue = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
res = parseResourceQueueAttributes(fineattr,
newqueue,
+ false,
errorbuf,
sizeof(errorbuf));
if (res != FUNC_RETURN_OK)
@@ -228,10 +229,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
}
newtrack = NULL;
- res = createQueueAndTrack(newqueue,
- &newtrack,
- errorbuf,
- sizeof(errorbuf));
+ res = createQueueAndTrack(newqueue, &newtrack, errorbuf, sizeof(errorbuf));
if (res != FUNC_RETURN_OK)
{
rm_pfree(PCONTEXT, newqueue);
@@ -267,6 +265,23 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
break;
case MANIPULATE_RESQUEUE_ALTER:
+ newqueue = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
+ res = parseResourceQueueAttributes(fineattr,
+ newqueue,
+ true,
+ errorbuf,
+ sizeof(errorbuf));
+ if (res != FUNC_RETURN_OK)
+ {
+ rm_pfree(PCONTEXT, newqueue);
+ ddlres = res;
+ elog(WARNING, "Resource manager can not alter resource queue "
+ "with its attributes because %s",
+ errorbuf);
+ goto senderr;
+ }
+ rm_pfree(PCONTEXT, newqueue);
+
toupdatetrack = getQueueTrackByQueueName((char *)(nameattr->Val.Str),
nameattr->Val.Len,
&exist);
@@ -428,10 +443,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
if (res != FUNC_RETURN_OK)
{
ddlres = res;
- elog(WARNING, ERRORPOS_FORMAT
- "Resource manager can not dropQueueAndTrack because %s",
- ERRREPORTPOS,
- errorbuf);
+ elog(WARNING, "Resource manager can not dropQueueAndTrack because %s",
+ errorbuf);
goto senderr;
}
@@ -471,58 +484,62 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
return true;
senderr:
- initializeSelfMaintainBuffer(&responsebuff, PCONTEXT);
- appendSelfMaintainBuffer(&responsebuff, (void *)&ddlres, sizeof(uint32_t));
- appendSelfMaintainBufferTill64bitAligned(&responsebuff);
-
- if (ddlres != FUNC_RETURN_OK) {
- appendSelfMaintainBuffer(&responsebuff, errorbuf, strlen(errorbuf)+1);
- }
-
- appendSelfMaintainBufferTill64bitAligned(&responsebuff);
-
- /* Build message saved in the connection track instance. */
- buildResponseIntoConnTrack((*conntrack),
- responsebuff.Buffer,
- responsebuff.Cursor+1,
- (*conntrack)->MessageMark1,
- (*conntrack)->MessageMark2,
- RESPONSE_QD_DDL_MANIPULATERESQUEUE);
- (*conntrack)->ResponseSent = false;
{
+ initializeSelfMaintainBuffer(&responsebuff, PCONTEXT);
+
+ RPCResponseHeadManipulateResQueueERRORData response;
+ response.Result.Result = ddlres;
+ response.Result.Reserved = 0;
+
+ appendSMBVar(&responsebuff, response.Result);
+ appendSMBStr(&responsebuff, errorbuf);
+ appendSelfMaintainBufferTill64bitAligned(&responsebuff);
+
+ /* Build message saved in the connection track instance. */
+ buildResponseIntoConnTrack((*conntrack),
+ responsebuff.Buffer,
+ responsebuff.Cursor + 1,
+ (*conntrack)->MessageMark1,
+ (*conntrack)->MessageMark2,
+ RESPONSE_QD_DDL_MANIPULATERESQUEUE);
+ (*conntrack)->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
MEMORY_CONTEXT_SWITCH_BACK
- }
- destroySelfMaintainBuffer(&responsebuff);
+ destroySelfMaintainBuffer(&responsebuff);
- /* Clean up temporary variable. */
- cleanPropertyList(PCONTEXT, &fineattr);
- cleanPropertyList(PCONTEXT, &rsqattr);
- return true;
+ /* Clean up temporary variable. */
+ cleanPropertyList(PCONTEXT, &fineattr);
+ cleanPropertyList(PCONTEXT, &rsqattr);
+ return true;
+ }
}
bool handleRMDDLRequestManipulateRole(void **arg)
{
RPCResponseHeadManipulateRoleData response;
- ConnectionTrack conntrack = (ConnectionTrack)(*arg);
- UserInfo user;
- int res = FUNC_RETURN_OK;
+ ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+ UserInfo user = NULL;
+ int res = FUNC_RETURN_OK;
- RPCRequestHeadManipulateRole request =
- (RPCRequestHeadManipulateRole )conntrack->MessageBuff.Buffer;
+ RPCRequestHeadManipulateRole request = (RPCRequestHeadManipulateRole)
+ (conntrack->MessageBuff.Buffer);
switch(request->Action)
{
case MANIPULATE_ROLE_RESQUEUE_CREATE:
{
+ /*
+ * In case creating new role, resource manager expects no error, as
+ * in QD side, the validation was passed.
+ */
user = rm_palloc0(PCONTEXT, sizeof(UserInfoData));
user->OID = request->RoleOID;
user->QueueOID = request->QueueOID;
user->isSuperUser = request->isSuperUser;
strncpy(user->Name, request->Name, sizeof(user->Name)-1);
- res = createUser(user, NULL, 0);
+ createUser(user);
elog(LOG, "Resource manager handles request CREATE ROLE oid:%d, "
"queueID:%d, isSuper:%d, roleName:%s",
request->RoleOID,
@@ -533,14 +550,13 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
case MANIPULATE_ROLE_RESQUEUE_ALTER:
{
- res = dropUser((int64_t)request->RoleOID, request->Name);
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Resource manager cannot find user "INT64_FORMAT
- " to alter.",
- (int64_t)(request->RoleOID));
- goto exit;
- }
+ /*
+ * In case altering one role, the old one is deleted firstly.
+ * Resource manager expects the role always exists.
+ */
+ int64_t roleoid = request->RoleOID;
+ res = dropUser(roleoid, request->Name);
+ Assert(res == FUNC_RETURN_OK);
/* Create new user instance. */
user = (UserInfo)rm_palloc0(PCONTEXT, sizeof(UserInfoData));
@@ -548,7 +564,7 @@ bool handleRMDDLRequestManipulateRole(void **arg)
user->QueueOID = request->QueueOID;
user->isSuperUser = request->isSuperUser;
strncpy(user->Name, request->Name, sizeof(user->Name)-1);
- res = createUser(user, NULL, 0);
+ createUser(user);
elog(LOG, "Resource manager handles request ALTER ROLE oid:%d, "
"queueID:%d, isSuper:%d, roleName:%s",
request->RoleOID,
@@ -559,14 +575,10 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
case MANIPULATE_ROLE_RESQUEUE_DROP:
{
- res = dropUser((int64_t)request->RoleOID, request->Name);
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Resource manager cannot find user "INT64_FORMAT
- " to drop.",
- (int64_t)(request->RoleOID));
- goto exit;
- }
+ /* Resource manager expects the role always exists. */
+ int64_t roleoid = request->RoleOID;
+ res = dropUser(roleoid, request->Name);
+ Assert(res == FUNC_RETURN_OK);
elog(LOG, "Resource manager handles request drop role oid:%d, "
"roleName:%s",
request->RoleOID,
@@ -579,7 +591,6 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
}
-exit:
/* Build response. */
response.Result = res;
response.Reserved = 0;
@@ -821,18 +832,21 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
List *rsqattr,
List **insvalues)
{
+ static char defaultActiveStats[] = DEFAULT_RESQUEUE_ACTIVESTATS;
+ static char defaultResOvercommit[] = DEFAULT_RESQUEUE_OVERCOMMIT;
+ static char defaultNVSegUpperLimit[] = DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT;
+ static char defaultNVSegLowerLimit[] = DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT;
+ static char defaultNVSegUpperLimitPerSeg[] = DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT;
+ static char defaultNVSegLowerLimitPerSeg[] = DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT;
+ static char defaultAllocPolicy[] = DEFAULT_RESQUEUE_ALLOCPOLICY;
+ static char defaultVSegResourceQuota[] = DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA;
+ int res = FUNC_RETURN_OK;
+ PAIR newpair = NULL;
+
Assert( queue != NULL );
Assert( rsqattr != NULL );
Assert( insvalues != NULL );
- int res = FUNC_RETURN_OK;
- char defaultActiveStats[] = DEFAULT_RESQUEUE_ACTIVESTATS;
- char defaultUpperFactor[] = DEFAULT_RESQUEUE_UPPERFACTOR;
- char defaultVSegUpperFactor[] = DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT;
- char defaultPolicy[] = DEFAULT_RESQUEUE_POLICY;
- char defaultSegQuota[] = DEFAULT_RESQUEUE_SEG_QUOTA;
- PAIR newpair = NULL;
-
/* Insert resource queue column value. */
newpair = createPAIR(PCONTEXT,
TYPCONVERT(void *, Anum_pg_resqueue_rsqname),
@@ -846,67 +860,101 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
*insvalues = lappend(*insvalues, newpair);
- /* Default value for rsq_active_stats_cluster if not set */
SimpStringPtr colvalue = NULL;
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_ACTIVE_STATMENTS),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_ACTIVE_STATMENTS),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultActiveStats,
+ Anum_pg_resqueue_activestats);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultResOvercommit,
+ Anum_pg_resqueue_resovercommit);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegUpperLimit,
+ Anum_pg_resqueue_nvsegupperlimit);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultActiveStats, Anum_pg_resqueue_rsq_active_stats_cluster);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegLowerLimit,
+ Anum_pg_resqueue_nvseglowerlimit);
}
- /* Default value for rsq_resource_upper_factor if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR),
- &colvalue) != FUNC_RETURN_OK)
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultUpperFactor, Anum_pg_resqueue_rsq_resource_upper_factor);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegUpperLimitPerSeg,
+ Anum_pg_resqueue_nvsegupperlimitperseg);
}
/* Default value for rsq_vseg_upper_limit if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultVSegUpperFactor, Anum_pg_resqueue_rsq_vseg_upper_limit);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegLowerLimitPerSeg,
+ Anum_pg_resqueue_nvseglowerlimitperseg);
}
/* Default value for rsq_allocation_policy if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_ALLOCATION_POLICY),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_ALLOCATION_POLICY),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultPolicy, Anum_pg_resqueue_rsq_allocation_policy);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultAllocPolicy,
+ Anum_pg_resqueue_allocpolicy);
}
/* Default value for rsq_vseg_resource_quota if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultSegQuota, Anum_pg_resqueue_rsq_vseg_resource_quota);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultVSegResourceQuota,
+ Anum_pg_resqueue_vsegresourcequota);
}
- ADD_PG_RESQUEUE_COLVALUE_OID(insvalues, queue->ParentOID, Anum_pg_resqueue_rsq_parent);
+ ADD_PG_RESQUEUE_COLVALUE_OID(insvalues, queue->ParentOID, Anum_pg_resqueue_parentoid);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_rsq_active_stats_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_memory_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_core_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, Anum_pg_resqueue_rsq_resource_upper_factor);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, Anum_pg_resqueue_rsq_vseg_upper_limit);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_rsq_allocation_policy);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, Anum_pg_resqueue_rsq_vseg_resource_quota);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_activestats);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_memorylimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_corelimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, Anum_pg_resqueue_resovercommit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, Anum_pg_resqueue_nvsegupperlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, Anum_pg_resqueue_nvseglowerlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, Anum_pg_resqueue_nvsegupperlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, Anum_pg_resqueue_nvseglowerlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_allocpolicy);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, Anum_pg_resqueue_vsegresourcequota);
/* creation time and update time */
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_rsq_creation_time);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_creationtime);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_updatetime);
/* status */
char statusstr[256];
@@ -914,7 +962,7 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
if ( RESQUEUE_IS_BRANCH(queue) )
strcat(statusstr, "branch");
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, statusstr, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, statusstr, Anum_pg_resqueue_status);
MEMORY_CONTEXT_SWITCH_BACK
return res;
@@ -926,18 +974,21 @@ int buildUpdateActionForPGResqueue(DynResourceQueue queue,
{
int res = FUNC_RETURN_OK;
/* Insert resource queue column value. */
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_rsq_active_stats_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_memory_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_core_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, Anum_pg_resqueue_rsq_resource_upper_factor);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_rsq_allocation_policy);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, Anum_pg_resqueue_rsq_vseg_resource_quota);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, Anum_pg_resqueue_rsq_vseg_upper_limit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_activestats);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_memorylimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_corelimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, Anum_pg_resqueue_resovercommit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_allocpolicy);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, Anum_pg_resqueue_vsegresourcequota);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, Anum_pg_resqueue_nvsegupperlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, Anum_pg_resqueue_nvseglowerlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, Anum_pg_resqueue_nvsegupperlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, Anum_pg_resqueue_nvseglowerlimitperseg);
/* creation time and update time */
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, curtimestr, Anum_pg_resqueue_updatetime);
/* status */
char statusstr[256];
@@ -945,7 +996,7 @@ int buildUpdateActionForPGResqueue(DynResourceQueue queue,
if ( RESQUEUE_IS_BRANCH(queue) )
strcat(statusstr, "branch");
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, statusstr, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, statusstr, Anum_pg_resqueue_status);
return res;
}
@@ -969,10 +1020,10 @@ int buildUpdateStatusActionForPGResqueue(DynResourceQueue queue,
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues,curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues,curtimestr, Anum_pg_resqueue_updatetime);
/* status */
- ADD_PG_RESQUEUE_COLVALUE_INATTR(updvalues, rsqattr, RSQ_TBL_ATTR_STATUS, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_INATTR(updvalues, rsqattr, RSQ_TBL_ATTR_STATUS, Anum_pg_resqueue_status);
return res;
}
@@ -1352,19 +1403,6 @@ int performDeleteActionForPGResqueue(char *queuename)
sql->data,
PQresultErrorMessage(result));
- /* MPP-6923: drop the extended attributes for this queue */
- PQclear(result);
- resetPQExpBuffer(sql);
- appendPQExpBuffer(sql,
- "DELETE FROM pg_resqueuecapability WHERE resqueueid = %d",
- queueid);
- result = PQexec(conn, sql->data);
- if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
- elog(WARNING, "Resource manager failed to run SQL: %s "
- "when delete a row from pg_resqueue, reason : %s",
- sql->data,
- PQresultErrorMessage(result));
-
PQclear(result);
result = PQexec(conn, "COMMIT");
if (!result || PQresultStatus(result) != PGRES_COMMAND_OK) {
@@ -1405,25 +1443,25 @@ Datum getDatumFromStringValuForPGResqueue(int colindex,
return DirectFunctionCall1(namein, CStringGetDatum(colvaluestr));
- case Anum_pg_resqueue_rsq_creation_time:
- case Anum_pg_resqueue_rsq_update_time:
+ case Anum_pg_resqueue_creationtime:
+ case Anum_pg_resqueue_updatetime:
return 0;
- case Anum_pg_resqueue_rsq_memory_limit_cluster:
- case Anum_pg_resqueue_rsq_core_limit_cluster:
- case Anum_pg_resqueue_rsq_allocation_policy:
- case Anum_pg_resqueue_rsq_vseg_resource_quota:
- case Anum_pg_resqueue_rsq_status:
+ case Anum_pg_resqueue_memorylimit:
+ case Anum_pg_resqueue_corelimit:
+ case Anum_pg_resqueue_allocpolicy:
+ case Anum_pg_resqueue_vsegresourcequota:
+ case Anum_pg_resqueue_status:
/* Set value as text format */
return DirectFunctionCall1(textin, CStringGetDatum(colvaluestr));
- case Anum_pg_resqueue_rsq_active_stats_cluster:
+ case Anum_pg_resqueue_activestats:
{
int32_t tmpvalue;
sscanf(colvaluestr, "%d", &tmpvalue);
return Int32GetDatum(tmpvalue);
}
- case Anum_pg_resqueue_rsq_parent:
+ case Anum_pg_resqueue_parentoid:
{
int64_t tmpoid;
Oid parentoid;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 0fe4412..ecb9e92 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -11,11 +11,16 @@
#include "libyarn/LibYarnClientC.h"
+#include <krb5.h>
+#include "cdb/cdbfilesystemcredential.h"
+
/*
*------------------------------------------------------------------------------
* Internal functions
*------------------------------------------------------------------------------
*/
+char * ExtractPrincipalFromTicketCache(const char* cache);
+
int ResBrokerMainInternal(void);
int loadParameters(void);
@@ -66,6 +71,12 @@ void quitResBroker(SIGNAL_ARGS);
uint64_t ResBrokerStartTime;
+/* The user who submits hawq application to Hadoop Yarn,
+ * default is postgres, if Kerberos is enable, should be principal name.
+ * */
+char* YARNUser;
+bool YARNUserShouldFree;
+
SimpString YARNServer;
SimpString YARNPort;
SimpString YARNSchedulerServer;
@@ -286,6 +297,85 @@ int ResBrokerMainInternal(void)
return FUNC_RETURN_OK;
}
+/*
+ * Extract principal from cache
+ */
+char * ExtractPrincipalFromTicketCache(const char* cache)
+{
+ krb5_context cxt = NULL;
+ krb5_ccache ccache = NULL;
+ krb5_principal principal = NULL;
+ krb5_error_code ec = 0;
+ char *priName = NULL, *retval = NULL;
+ const char *errorMsg = NULL;
+
+ /*
+ * refresh kerberos ticket
+ */
+ if (!login()) {
+ elog(WARNING, "Cannot login kerberos.");
+ return NULL;
+ }
+
+ if (!cache) {
+ if (0 != setenv("KRB5CCNAME", cache, 1)) {
+ elog(WARNING, "Cannot set env parameter \"KRB5CCNAME\" when extract principal from cache:%s", cache);
+ return NULL;
+ }
+ }
+
+ do {
+ if (0 != (ec = krb5_init_context(&cxt))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_cc_default(cxt, &ccache))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) {
+ break;
+ }
+ } while (0);
+
+ if (!ec) {
+ retval = strdup(priName);
+ } else {
+ if (cxt) {
+ errorMsg = krb5_get_error_message(cxt, ec);
+ } else {
+ errorMsg = "Cannot initialize kerberos context";
+ }
+ }
+
+ if (priName != NULL) {
+ krb5_free_unparsed_name(cxt, priName);
+ }
+
+ if (principal != NULL) {
+ krb5_free_principal(cxt, principal);
+ }
+
+ if (ccache != NULL) {
+ krb5_cc_close(cxt, ccache);
+ }
+
+ if (cxt != NULL) {
+ krb5_free_context(cxt);
+ }
+
+ if (errorMsg != NULL) {
+ elog(WARNING, "Fail to extract principal from cache, because : %s", errorMsg);
+ return NULL;
+ }
+
+ return retval;
+}
+
int loadParameters(void)
{
int res = FUNC_RETURN_OK;
@@ -296,6 +386,8 @@ int loadParameters(void)
initSimpleString(&YARNSchedulerPort, PCONTEXT);
initSimpleString(&YARNQueueName, PCONTEXT);
initSimpleString(&YARNAppName, PCONTEXT);
+ YARNUser = NULL;
+ YARNUserShouldFree = false;
/* Get server and port */
char *pcolon = NULL;
@@ -363,16 +455,31 @@ int loadParameters(void)
setSimpleStringNoLen(&YARNAppName, rm_grm_yarn_app_name);
+ /* If kerberos is enable, fetch the principal from ticket cache file. */
+ if (enable_secure_filesystem)
+ {
+ YARNUser = ExtractPrincipalFromTicketCache(krb5_ccname);
+ YARNUserShouldFree = true;
+ }
+
+ if (YARNUser == NULL)
+ {
+ YARNUser = "postgres";
+ YARNUserShouldFree = false;
+ }
+
elog(LOG, "YARN mode resource broker accepted YARN connection arguments : "
"YARN Server %s:%s "
"Scheduler server %s:%s "
- "Queue %s Application name %s",
+ "Queue %s Application name %s, "
+ "by user:%s",
YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
YARNQueueName.Str,
- YARNAppName.Str);
+ YARNAppName.Str,
+ YARNUser);
exit:
if ( res != FUNC_RETURN_OK ) {
elog(LOG, "YARN mode resource broker failed to load YARN connection arguments.");
@@ -1139,7 +1246,8 @@ int RB2YARN_connectToYARN(void)
int yarnres = FUNCTION_SUCCEEDED;
/* Setup YARN client. */
- yarnres = newLibYarnClient(YARNServer.Str,
+ yarnres = newLibYarnClient(YARNUser,
+ YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
@@ -1665,7 +1773,13 @@ int RB2YARN_disconnectFromYARN(void)
if ( YARNJobID != NULL ) {
free(YARNJobID);
}
+ if (YARNUser != NULL && YARNUserShouldFree )
+ {
+ free(YARNUser);
+ }
LIBYARNClient = NULL;
YARNJobID = NULL;
+ YARNUser = NULL;
+ YARNUserShouldFree = true;
return FUNCTION_SUCCEEDED;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 5e4fe0f..5577e83 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -145,7 +145,6 @@ bool cleanedAllGRMContainers(void);
colname##IsNull ? -1 : \
DatumGetInt64(colname##Datum)));
-
int loadUserPropertiesFromCatalog(List **users);
int loadQueuePropertiesFromCatalog(List **queues);
@@ -618,7 +617,9 @@ int MainHandlerLoop(void)
/* STEP 10. Dispatch resource to queries and send the messages out.*/
if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 &&
PQUEMGR->toRunQueryDispatch &&
- PQUEMGR->ForcedReturnGRMContainerCount == 0 )
+ PQUEMGR->ForcedReturnGRMContainerCount == 0 &&
+ PRESPOOL->AddPendingContainerCount == 0 &&
+ PRESPOOL->SlavesHostCount > 0 )
{
dispatchResourceToQueries();
}
@@ -646,6 +647,12 @@ int MainHandlerLoop(void)
/* STEP 13. Notify segments to decrease resource. */
notifyToBeKickedGRMContainersToRMSEG();
+
+ /*
+ * STEP 14. Check slaves file if the content is not checked or is
+ * updated.
+ */
+ checkSlavesFile();
}
elog(LOG, "Resource manager main event handler exits.");
@@ -960,43 +967,55 @@ static void InitTemporaryDirs(DQueue tmpdirs_list, char *tmpdirs_string)
*/
int loadDynamicResourceManagerConfigure(void)
{
- elog(DEBUG5, "HAWQ RM :: Unix Domain Socket Port %d", rm_master_addr_domain_port);
- elog(DEBUG5, "HAWQ RM :: Socket Listening Port %d", rm_master_addr_port);
- elog(DEBUG5, "HAWQ RM :: Segment Socket Listening Port %d", rm_seg_addr_port);
+#ifdef ENABLE_DOMAINSERVER
+ elog(DEBUG3, "Resource manager loads Unix Domain Socket Port %d",
+ rm_master_addr_domain_port);
+#endif
+ elog(DEBUG3, "Resource manager loads Socket Listening Port %d",
+ rm_master_port);
+ elog(DEBUG3, "Resource manager loads Segment Socket Listening Port %d",
+ rm_segment_port);
/* Decide global resource manager mode. */
- if ( strcmp(rm_grm_server_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 ) {
+ if ( strcmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 )
+ {
DRMGlobalInstance->ImpType = YARN_LIBYARN;
}
- else if ( strcmp(rm_grm_server_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 ) {
+ else if ( strcmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 )
+ {
DRMGlobalInstance->ImpType = NONE_HAWQ2;
}
- else {
- elog(LOG, "Wrong global resource manager type set in %s.",
- HAWQDRM_CONFFILE_SERVER_TYPE);
+ else
+ {
+ elog(WARNING, "Wrong global resource manager type set in %s.",
+ HAWQDRM_CONFFILE_SERVER_TYPE);
return MAIN_CONF_UNSET_ROLE;
}
- elog(DEBUG5, "HAWQ RM :: Resource broker implement mode : %d", DRMGlobalInstance->ImpType);
+ elog(DEBUG3, "Resource manager loads resource broker implement mode : %d",
+ DRMGlobalInstance->ImpType);
SimpString segmem;
- if ( rm_seg_memory_use[0] == '\0' ) {
- elog(LOG, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
+ if ( rm_seg_memory_use[0] == '\0' )
+ {
+ elog(WARNING, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
}
+
setSimpleStringRefNoLen(&segmem, rm_seg_memory_use);
int res = SimpleStringToStorageSizeMB(&segmem,
&(DRMGlobalInstance->SegmentMemoryMB));
- if ( res != FUNC_RETURN_OK) {
- elog(LOG, "Can not understand the value '%s' of property %s.",
- rm_seg_memory_use,
- HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
+ if ( res != FUNC_RETURN_OK)
+ {
+ elog(WARNING, "Can not understand the value '%s' of property %s.",
+ rm_seg_memory_use,
+ HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
}
DRMGlobalInstance->SegmentCore = rm_seg_core_use;
- elog(DEBUG5, "HAWQ RM :: Accepted NONE mode resource management : "
- "each host has (%d MB,%lf).\n",
+ elog(DEBUG3, "HAWQ RM :: Accepted NONE mode resource management setting, "
+ "each host has (%d MB,%lf) resource capacity.\n",
DRMGlobalInstance->SegmentMemoryMB,
DRMGlobalInstance->SegmentCore);
@@ -1043,7 +1062,6 @@ int loadDynamicResourceManagerConfigure(void)
DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period;
/****** Resource enforcement GUCs ends ******/
-
return FUNC_RETURN_OK;
}
@@ -1249,44 +1267,51 @@ cleanup:
*****************************************************************************/
int loadQueuePropertiesFromCatalog(List **queues)
{
- int libpqres = CONNECTION_OK;
- int ret = FUNC_RETURN_OK;
- PGconn *conn = NULL;
+ int libpqres = CONNECTION_OK;
+ int ret = FUNC_RETURN_OK;
+ PGconn *conn = NULL;
static char conninfo[1024];
- PQExpBuffer sql = NULL;
- PGresult* result = NULL;
- int ntups = 0;
- int i_oid = 0,
- i_name = 0,
- i_parent = 0,
- i_active_stats_cluster = 0,
- i_memory_limit_cluster = 0,
- i_core_limit_cluster = 0,
- i_resource_upper_factor = 0,
- i_allocation_policy = 0,
- i_vseg_resource_quota = 0,
- i_vseg_upper_limit = 0,
- i_creation_time = 0,
- i_update_time = 0,
- i_status = 0;
-
- Oid oid = 0,
- rsq_parent = 0;
-
- char *rsqname = NULL,
+ PQExpBuffer sql = NULL;
+ PGresult* result = NULL;
+ int ntups = 0;
+ int i_oid = 0,
+ i_name = 0,
+ i_parent = 0,
+ i_active_stats_cluster = 0,
+ i_memory_limit_cluster = 0,
+ i_core_limit_cluster = 0,
+ i_resource_overcommit = 0,
+ i_allocation_policy = 0,
+ i_vseg_resource_quota = 0,
+ i_nvseg_upper_limit = 0,
+ i_nvseg_lower_limit = 0,
+ i_nvseg_upper_limit_perseg = 0,
+ i_nvseg_lower_limit_perseg = 0,
+ i_creation_time = 0,
+ i_update_time = 0,
+ i_status = 0;
+
+ Oid oid = 0,
+ parentoid = 0;
+
+ char *name = NULL,
*parent = NULL,
- *rsq_memory_limit_cluster = NULL,
- *rsq_core_limit_cluster = NULL,
- *rsq_allocation_policy = NULL,
- *rsq_vseg_resource_quota = NULL,
- *rsq_status = NULL;
+ *memory_limit_cluster = NULL,
+ *core_limit_cluster = NULL,
+ *allocation_policy = NULL,
+ *vseg_resource_quota = NULL,
+ *status = NULL;
+
+ int active_stats_cluster = 0,
+ nvseg_upper_limit = 0,
+ nvseg_lower_limit = 0;
- int rsq_active_stats_cluster = 0,
- rsq_vseg_upper_limit = 0;
+ float nvseg_upper_limit_perseg = 0.0,
+ nvseg_lower_limit_perseg = 0.0,
+ resource_overcommit = 0.0;
- float rsq_resource_upper_factor = 0.0;
- int64 rsq_creation_time = 0,
- rsq_update_time = 0;
+ int64 creation_time = 0,
+ update_time = 0;
snprintf(conninfo, sizeof(conninfo),
"options='-c gp_session_role=UTILITY' "
@@ -1313,19 +1338,23 @@ int loadQueuePropertiesFromCatalog(List **queues)
"sql statement.");
goto cleanup;
}
+
appendPQExpBuffer(sql,"SELECT oid,"
"rsqname,"
- "rsq_parent,"
- "rsq_active_stats_cluster,"
- "rsq_memory_limit_cluster, "
- "rsq_core_limit_cluster, "
- "rsq_resource_upper_factor,"
- "rsq_allocation_policy, "
- "rsq_vseg_resource_quota, "
- "rsq_vseg_upper_limit, "
- "rsq_creation_time, "
- "rsq_update_time, "
- "rsq_status "
+ "parentoid,"
+ "activestats,"
+ "memorylimit, "
+ "corelimit, "
+ "resovercommit,"
+ "allocpolicy, "
+ "vsegresourcequota, "
+ "nvsegupperlimit, "
+ "nvseglowerlimit, "
+ "nvsegupperlimitperseg, "
+ "nvseglowerlimitperseg, "
+ "creationtime, "
+ "updatetime, "
+ "status "
"FROM pg_resqueue");
result = PQexec(conn, sql->data);
@@ -1342,45 +1371,51 @@ int loadQueuePropertiesFromCatalog(List **queues)
ntups = PQntuples(result);
- i_oid = PQfnumber(result, PG_RESQUEUE_COL_OID);
- i_name = PQfnumber(result, PG_RESQUEUE_COL_RSQNAME);
- i_parent = PQfnumber(result, PG_RESQUEUE_COL_PARENT);
- i_active_stats_cluster = PQfnumber(result, PG_RESQUEUE_COL_ACTIVE_STATS_CLUSTER);
- i_memory_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_MEMORY_LIMIT_CLUSTER);
- i_core_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_CORE_LIMIT_CLUSTER);
- i_resource_upper_factor = PQfnumber(result, PG_RESQUEUE_COL_RESOURCE_UPPER_FACTOR);
- i_allocation_policy = PQfnumber(result, PG_RESQUEUE_COL_ALLOCATION_POLICY);
- i_vseg_resource_quota = PQfnumber(result, PG_RESQUEUE_COL_VSEG_RESOURCE_QUOTA);
- i_vseg_upper_limit = PQfnumber(result, PG_RESQUEUE_COL_VSEG_UPPER_LIMIT);
- i_creation_time = PQfnumber(result, PG_RESQUEUE_COL_CREATION_TIME);
- i_update_time = PQfnumber(result, PG_RESQUEUE_COL_UPDATE_TIME);
- i_status = PQfnumber(result, PG_RESQUEUE_COL_STATUS);
+ i_oid = PQfnumber(result, PG_RESQUEUE_COL_OID);
+ i_name = PQfnumber(result, PG_RESQUEUE_COL_RSQNAME);
+ i_parent = PQfnumber(result, PG_RESQUEUE_COL_PARENTOID);
+ i_active_stats_cluster = PQfnumber(result, PG_RESQUEUE_COL_ACTIVESTATS);
+ i_memory_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_MEMORYLIMIT);
+ i_core_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_CORELIMIT);
+ i_resource_overcommit = PQfnumber(result, PG_RESQUEUE_COL_RESOVERCOMMIT);
+ i_allocation_policy = PQfnumber(result, PG_RESQUEUE_COL_ALLOCPOLICY);
+ i_vseg_resource_quota = PQfnumber(result, PG_RESQUEUE_COL_VSEGRESOURCEQUOTA);
+ i_nvseg_upper_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMIT);
+ i_nvseg_lower_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMIT);
+ i_nvseg_upper_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMITPERSEG);
+ i_nvseg_lower_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMITPERSEG);
+ i_creation_time = PQfnumber(result, PG_RESQUEUE_COL_CREATIONTIME);
+ i_update_time = PQfnumber(result, PG_RESQUEUE_COL_UPDATETIME);
+ i_status = PQfnumber(result, PG_RESQUEUE_COL_STATUS);
for (int i = 0; i < ntups; i++)
{
oid = (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10);
- rsqname = PQgetvalue(result, i, i_name);
+ name = PQgetvalue(result, i, i_name);
parent = PQgetvalue(result, i, i_parent);
if (parent == NULL || strlen(parent) == 0)
{
- rsq_parent = InvalidOid;
+ parentoid = InvalidOid;
}
else
{
- rsq_parent = (Oid)strtoul(parent, NULL, 10);
+ parentoid = (Oid)strtoul(parent, NULL, 10);
}
- rsq_parent = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10);
- rsq_active_stats_cluster = atoi(PQgetvalue(result, i, i_active_stats_cluster));
- rsq_memory_limit_cluster = PQgetvalue(result, i, i_memory_limit_cluster);
- rsq_core_limit_cluster = PQgetvalue(result, i, i_core_limit_cluster);
- rsq_resource_upper_factor = atof(PQgetvalue(result, i, i_resource_upper_factor));
- rsq_allocation_policy = PQgetvalue(result, i, i_allocation_policy);
- rsq_vseg_resource_quota = PQgetvalue(result, i, i_vseg_resource_quota);
- rsq_vseg_upper_limit = atoi(PQgetvalue(result, i, i_vseg_upper_limit));
- rsq_creation_time = atol(PQgetvalue(result, i, i_creation_time));
- rsq_update_time = atol(PQgetvalue(result, i, i_update_time));
- rsq_status = PQgetvalue(result, i, i_status);
+ parentoid = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10);
+ active_stats_cluster = atoi(PQgetvalue(result, i, i_active_stats_cluster));
+ memory_limit_cluster = PQgetvalue(result, i, i_memory_limit_cluster);
+ core_limit_cluster = PQgetvalue(result, i, i_core_limit_cluster);
+ resource_overcommit = atof(PQgetvalue(result, i, i_resource_overcommit));
+ allocation_policy = PQgetvalue(result, i, i_allocation_policy);
+ vseg_resource_quota = PQgetvalue(result, i, i_vseg_resource_quota);
+ nvseg_upper_limit = atoi(PQgetvalue(result, i, i_nvseg_upper_limit));
+ nvseg_lower_limit = atoi(PQgetvalue(result, i, i_nvseg_lower_limit));
+ nvseg_upper_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_upper_limit_perseg));
+ nvseg_lower_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_lower_limit_perseg));
+ creation_time = atol(PQgetvalue(result, i, i_creation_time));
+ update_time = atol(PQgetvalue(result, i, i_update_time));
+ status = PQgetvalue(result, i, i_status);
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
*queues = lappend(*queues,
@@ -1397,7 +1432,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME),
&i,
- (Name)rsqname));
+ (Name)name));
*queues = lappend(*queues,
createPropertyOID(
@@ -1405,7 +1440,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_PARENT),
&i,
- rsq_parent));
+ parentoid));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1413,7 +1448,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_ACTIVE_STATMENTS),
&i,
- rsq_active_stats_cluster));
+ active_stats_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1421,7 +1456,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER),
&i,
- rsq_memory_limit_cluster));
+ memory_limit_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1429,7 +1464,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER),
&i,
- rsq_core_limit_cluster));
+ core_limit_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1437,31 +1472,55 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_ALLOCATION_POLICY),
&i,
- rsq_allocation_policy));
+ allocation_policy));
*queues = lappend(*queues,
createPropertyFloat(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR),
+ getRSQTBLAttributeName(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR),
&i,
- rsq_resource_upper_factor));
+ resource_overcommit));
*queues = lappend(*queues,
createPropertyString(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA),
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA),
&i,
- rsq_vseg_resource_quota));
+ vseg_resource_quota));
*queues = lappend(*queues,
createPropertyInt32(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEGMENT_UPPER_LIMIT),
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT),
+ &i,
+ nvseg_upper_limit));
+
+ *queues = lappend(*queues,
+ createPropertyInt32(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT),
+ &i,
+ nvseg_lower_limit));
+
+ *queues = lappend(*queues,
+ createPropertyFloat(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG),
+ &i,
+ nvseg_upper_limit_perseg));
+
+ *queues = lappend(*queues,
+ createPropertyFloat(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG),
&i,
- rsq_vseg_upper_limit));
+ nvseg_lower_limit_perseg));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1469,7 +1528,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_CREATION_TIME),
&i,
- rsq_creation_time));
+ creation_time));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1477,7 +1536,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_UPDATE_TIME),
&i,
- rsq_update_time));
+ update_time));
*queues = lappend(*queues,
createPropertyString(
@@ -1485,7 +1544,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_STATUS),
&i,
- rsq_status));
+ status));
MEMORY_CONTEXT_SWITCH_BACK
}
@@ -1558,12 +1617,12 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
{
KVProperty value = lfirst(cell);
- elog(DEBUG3, "Loads queue property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Loads queue property %s=%s", value->Key.Str, value->Val.Str);
/* Split key string into (attribute, index) */
if ( SimpleStringStartWith(&(value->Key), "queue.") != FUNC_RETURN_OK )
{
- elog(DEBUG3, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
continue;
}
@@ -1619,9 +1678,9 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
currentindex = queueindex;
}
- elog(DEBUG3, "Resource manager loaded attribute for creating queue %s=%s",
- newprop->Key.Str,
- newprop->Val.Str);
+ elog(RMLOG, "Resource manager loaded attribute for creating queue %s=%s",
+ newprop->Key.Str,
+ newprop->Val.Str);
currentattrs = lappend(currentattrs, newprop);
MEMORY_CONTEXT_SWITCH_BACK
@@ -1641,12 +1700,12 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
{
KVProperty value = lfirst(cell);
- elog(DEBUG3, "Loads user property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Loads user property %s=%s", value->Key.Str, value->Val.Str);
/* Split key string into (attribute, index) */
if ( SimpleStringStartWith(&(value->Key), "user.") != FUNC_RETURN_OK )
{
- elog(DEBUG3, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
continue;
}
@@ -1698,9 +1757,9 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
currentindex = userindex;
}
- elog(DEBUG3, "Resource manager loaded attribute for creating role %s=%s",
- newprop->Key.Str,
- newprop->Val.Str);
+ elog(RMLOG, "Resource manager loaded attribute for creating role %s=%s",
+ newprop->Key.Str,
+ newprop->Val.Str);
currentattrs = lappend(currentattrs, newprop);
MEMORY_CONTEXT_SWITCH_BACK
@@ -1726,7 +1785,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
foreach(cell2, attrs)
{
KVProperty attrkv = lfirst(cell2);
- elog(DEBUG3, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str);
+ elog(RMLOG, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str);
}
DynResourceQueue newqueue = rm_palloc0(PCONTEXT,
@@ -1734,6 +1793,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
res = parseResourceQueueAttributes(attrs,
newqueue,
+ false,
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
@@ -1748,7 +1808,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
rawrsqs = lappend(rawrsqs, newqueue);
MEMORY_CONTEXT_SWITCH_BACK
- DQUEUE_LOOP_END
+ }
/*
* STEP 2.2. Reorder the resource queue sequence to ensure that every time
@@ -1796,8 +1856,8 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
orderchanged = false;
if ( toreordrsq != NULL )
{
- elog(DEBUG3, "Find one resource queue valid to continue loading %s.",
- toreordrsq->Name);
+ elog(RMLOG, "Find one resource queue valid to continue loading %s.",
+ toreordrsq->Name);
orderchanged = true;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
orderedrsqs = lappend(orderedrsqs, toreordrsq);
@@ -1842,23 +1902,29 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
orderedrsqs = list_delete_first(orderedrsqs);
MEMORY_CONTEXT_SWITCH_BACK
+ elog(RMLOG, "Load queue %s.", partqueue->Name);
+
res = checkAndCompleteNewResourceQueueAttributes(partqueue,
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
+ elog(RMLOG, "res=%d error=%s, after check and complete queue %s.",
+ res,
+ errorbuf,
+ partqueue->Name);
+
rm_pfree(PCONTEXT, partqueue);
- elog( WARNING, "Resource manager can not complete resource queue's "
- "attributes because %s",
- errorbuf);
+ elog(WARNING, "Resource manager can not complete resource queue's "
+ "attributes because %s",
+ errorbuf);
continue;
}
+ elog(RMLOG, "Checked and completed queue %s.", partqueue->Name);
+
DynResourceQueueTrack newtrack = NULL;
- res = createQueueAndTrack(partqueue,
- &newtrack,
- errorbuf,
- sizeof(errorbuf));
+ res = createQueueAndTrack(partqueue, &newtrack, errorbuf, sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
@@ -1875,6 +1941,8 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
continue;
}
+ elog(RMLOG, "Created queue %s.", partqueue->Name);
+
char buffer[1024];
generateQueueReport(partqueue->OID, buffer, sizeof(buffer));
elog(LOG, "Resource manager created resource queue instance : %s",
@@ -1912,15 +1980,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
continue;
}
- res = createUser(newuser, errorbuf, sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Can not create user %s because %s",
- newuser->Name,
- errorbuf);
- rm_pfree(PCONTEXT, newuser);
- continue;
- }
+ createUser(newuser);
char buffer[256];
generateUserReport(newuser->Name,
@@ -2125,10 +2185,12 @@ int initializeSocketServer(void)
char *allip = "0.0.0.0";
pgsocket RMListenSocket[HAWQRM_SERVER_PORT_COUNT];
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
RMListenSocket[i] = PGINVALID_SOCKET;
}
+#ifdef ENABLE_DOMAINSERVER
/* Listen local unix domain socket port. */
netres = StreamServerPort(AF_UNIX,
NULL,
@@ -2142,7 +2204,8 @@ int initializeSocketServer(void)
* This condition is for double-checking the server is successfully
* created.
*/
- (netres == STATUS_OK && RMListenSocket[0] == PGINVALID_SOCKET) ) {
+ (netres == STATUS_OK && RMListenSocket[0] == PGINVALID_SOCKET) )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(LOG, "Resource manager cannot create UNIX domain socket server. Port=%d",
rm_master_addr_domain_port);
@@ -2152,15 +2215,23 @@ int initializeSocketServer(void)
/* Listen normal socket addresses. */
netres = StreamServerPort(AF_UNSPEC,
allip,
- rm_master_addr_port,
+ rm_master_port,
NULL,
&(RMListenSocket[1]),
HAWQRM_SERVER_PORT_COUNT-1);
-
- if ( netres != STATUS_OK ) {
+#else
+ netres = StreamServerPort(AF_UNSPEC,
+ allip,
+ rm_master_port,
+ NULL,
+ RMListenSocket,
+ HAWQRM_SERVER_PORT_COUNT);
+#endif
+ if ( netres != STATUS_OK )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(LOG, "Resource manager cannot create socket server. Port=%d",
- rm_master_addr_port);
+ rm_master_port);
return res;
}
@@ -2168,39 +2239,54 @@ int initializeSocketServer(void)
initializeAsyncComm();
int validfdcount = 0;
AsyncCommBuffer newbuffer = NULL;
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
- if (RMListenSocket[i] != PGINVALID_SOCKET) {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
+ if (RMListenSocket[i] != PGINVALID_SOCKET)
+ {
netres = registerFileDesc(RMListenSocket[i],
NULL,
ASYNCCOMM_READ,
&AsyncCommBufferHandlersMsgServer,
NULL,
&newbuffer);
- if ( netres != FUNC_RETURN_OK ) {
+ if ( netres != FUNC_RETURN_OK )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(WARNING, "Resource manager cannot track socket server.");
break;
}
validfdcount++;
-
InitHandler_Message(newbuffer);
}
}
- if ( res != FUNC_RETURN_OK ) {
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
- if ( RMListenSocket[i] != PGINVALID_SOCKET ) close(RMListenSocket[i]);
+ if ( res != FUNC_RETURN_OK )
+ {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
+ if ( RMListenSocket[i] != PGINVALID_SOCKET )
+ {
+ close(RMListenSocket[i]);
+ }
}
return res;
}
- elog(DEBUG5, "HAWQ RM :: Start accepting resource request. "
- "Listening unix domain socket port %d. "
- "Listening normal socket port %d. "
- "Total listened %d FDs.",
- rm_master_addr_domain_port,
- rm_master_addr_port,
- validfdcount);
+#ifdef ENABLE_DOMAINSERVER
+ elog(LOG, "Resource manager starts accepting resource request. "
+ "Listening unix domain socket port %d. "
+ "Listening normal socket port %d. "
+ "Total listened %d FDs.",
+ rm_master_addr_domain_port,
+ rm_master_port,
+ validfdcount);
+#else
+ elog(LOG, "Resource manager starts accepting resource request. "
+ "Listening normal socket port %d. "
+ "Total listened %d FDs.",
+ rm_master_port,
+ validfdcount);
+#endif
return res;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcemanager_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c
index b62c14d..df589bb 100644
--- a/src/backend/resourcemanager/resourcemanager_RMSEG.c
+++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c
@@ -73,7 +73,7 @@ int initializeSocketServer_RMSEG(void)
/* Listen normal socket addresses. */
netres = StreamServerPort( AF_UNSPEC,
allip,
- rm_seg_addr_port,
+ rm_segment_port,
NULL,
RMListenSocket,
HAWQRM_SERVER_PORT_COUNT);
@@ -87,7 +87,7 @@ int initializeSocketServer_RMSEG(void)
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog( LOG, "Can not create socket server. HostName=%s, Port=%d",
allip,
- rm_seg_addr_port);
+ rm_segment_port);
return res;
}
@@ -125,7 +125,7 @@ int initializeSocketServer_RMSEG(void)
"Listening normal socket port %s:%d. "
"Total listened %d FDs.",
allip,
- rm_seg_addr_port,
+ rm_segment_port,
validfdcount);
return res;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 7e413df..004cd67 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -47,6 +47,8 @@ void getSegResResourceCountersByMemCoreCounters(SegResource resinfo,
VSegmentCounterInternal createVSegmentCounter(uint32_t hdfsnameindex,
SegResource segres);
+void refreshSlavesFileHostSize(FILE *fp);
+
/* Functions for BBST indices. */
int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2);
int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
@@ -331,6 +333,9 @@ void initializeResourcePoolManager(void)
PRESPOOL->allocateResFuncs[i] = NULL;
}
PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes;
+
+ PRESPOOL->SlavesFileTimestamp = 0;
+ PRESPOOL->SlavesHostCount = 0;
}
#define CONNECT_TIMEOUT 60
@@ -1838,7 +1843,7 @@ int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
* slice limit. Because we will gothrough all segments later
* if not enough segments are found in this loop.
*/
- if ( segresource->SliceWorkload + slicesize > rm_slice_num_per_seg_limit )
+ if ( segresource->SliceWorkload + slicesize > rm_nslice_perseg_limit )
{
elog(DEBUG3, "Segment %s contains %d slices working now, it can "
"not afford %d more slices.",
@@ -1968,8 +1973,7 @@ int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
else
{
- if ( !fixnodecount &&
- curres->SliceWorkload + slicesize > rm_slice_num_per_seg_limit )
+ if ( curres->SliceWorkload + slicesize > rm_nslice_perseg_limit )
{
elog(LOG, "Segment %s contains %d slices working now, "
"it can not afford %d more slices.",
@@ -3553,6 +3557,124 @@ int getSegmentGRMContainerSize(SegResource segres)
return segres->GRMContainerCount;
}
+void checkSlavesFile(void)
+{
+ static char *filename = NULL;
+
+ if ( filename == NULL )
+ {
+
+ char *gphome = getenv("GPHOME");
+ if ( gphome == NULL )
+ {
+ elog(WARNING, "The environment variable GPHOME is not set. "
+ "Resource manager can not find file slaves.");
+ return;
+ }
+
+ filename = rm_palloc0(PCONTEXT, strlen(gphome) + sizeof("/etc/slaves"));
+
+ sprintf(filename, "%s%s", gphome, "/etc/slaves");
+ }
+
+ elog(DEBUG3, "Resource manager reads slaves file %s.", filename);
+
+ /* Get file stat. */
+ struct stat filestat;
+ FILE *fp = fopen(filename, "r");
+ if ( fp == NULL )
+ {
+ elog(WARNING, "Fail to open slaves file %s. errno %d", filename, errno);
+ return;
+ }
+ int fd = fileno(fp);
+
+ int fres = fstat(fd, &filestat);
+ if ( fres != 0 )
+ {
+ fclose(fp);
+ elog(WARNING, "Fail to get slaves file stat %s. errno %d", filename, errno);
+ return;
+ }
+ int64_t filechangetime = filestat.st_mtime;
+
+ elog(DEBUG3, "Current file change time stamp " INT64_FORMAT, filechangetime);
+
+ if ( filechangetime != PRESPOOL->SlavesFileTimestamp )
+ {
+ refreshSlavesFileHostSize(fp);
+ PRESPOOL->SlavesFileTimestamp = filechangetime;
+ }
+
+ fclose(fp);
+}
+
+void refreshSlavesFileHostSize(FILE *fp)
+{
+ static char zero[1] = "";
+ int newcnt = 0;
+ bool haserror = false;
+ SelfMaintainBufferData smb;
+
+ elog(DEBUG3, "Refresh slaves file host size now.");
+
+ initializeSelfMaintainBuffer(&smb, PCONTEXT);
+ while( true )
+ {
+ char c = fgetc(fp);
+ if ( c == EOF )
+ {
+ if ( feof(fp) == 0 )
+ {
+ elog(WARNING, "Failed to read slaves file, ferror() gets %d",
+ ferror(fp));
+ haserror = true;
+ }
+
+ break;
+ }
+
+ if ( c == '\t' || c == ' ' || c == '\r' )
+ {
+ continue;
+ }
+
+ if ( c == '\n' )
+ {
+ if ( smb.Cursor + 1 > 0 )
+ {
+ appendSelfMaintainBuffer(&smb, zero, 1);
+ elog(DEBUG3, "Loaded slaves host %s", smb.Buffer);
+
+ resetSelfMaintainBuffer(&smb);
+ newcnt++;
+ }
+ }
+ else
+ {
+ /* Add this character into the buffer. */
+ appendSelfMaintainBuffer(&smb, &c, 1);
+ }
+ }
+
+ if ( smb.Cursor + 1 > 0 )
+ {
+ appendSelfMaintainBuffer(&smb, zero, 1);
+ elog(DEBUG3, "Loaded slaves host %s (last one)", smb.Buffer);
+ newcnt++;
+ }
+
+ destroySelfMaintainBuffer(&smb);
+
+ if ( !haserror )
+ {
+ elog(LOG, "Resource manager refreshed slaves host size from %d to %d.",
+ PRESPOOL->SlavesHostCount,
+ newcnt);
+ PRESPOOL->SlavesHostCount = newcnt;
+ }
+
+}
void getSegResResourceCountersByMemCoreCounters(SegResource resinfo,
int32_t *allocmem,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resqueuecommand.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuecommand.c b/src/backend/resourcemanager/resqueuecommand.c
index 0f69b81..0247fef 100644
--- a/src/backend/resourcemanager/resqueuecommand.c
+++ b/src/backend/resourcemanager/resqueuecommand.c
@@ -56,7 +56,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
- errmsg("resource queue name \"%s\" is reserved",
+ errmsg("resource queue name %s is reserved",
stmt->queue),
errOmitLocation(true)));
}
@@ -73,7 +73,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("resource queue \"%s\" already exists",
+ errmsg("resource queue %s already exists",
stmt->queue)));
}
@@ -86,7 +86,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -99,7 +100,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
@@ -123,7 +125,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
}
@@ -144,9 +146,21 @@ void dropResourceQueue(DropQueueStmt *stmt)
/* Permission check - only superuser can create queues. */
if (!superuser())
+ {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create resource queues")));
+ }
+
+ /* Cannot DROP default and root queue */
+ if ( strcmp(stmt->queue, RESOURCE_QUEUE_DEFAULT_QUEUE_NAME) == 0 ||
+ strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot drop system resource queue %s",
+ stmt->queue)));
+ }
/*
* Check the pg_resqueue relation to be certain the queue already
@@ -156,30 +170,19 @@ void dropResourceQueue(DropQueueStmt *stmt)
pcqCtx = caql_addrel(cqclr(&cqc), pg_resqueue_rel);
- tuple = caql_getfirst(
- pcqCtx,
- cql("SELECT * FROM pg_resqueue"
- " WHERE rsqname = :1 FOR UPDATE",
- CStringGetDatum(stmt->queue)));
+ tuple = caql_getfirst(pcqCtx,
+ cql("SELECT * FROM pg_resqueue WHERE rsqname = :1 FOR UPDATE",
+ CStringGetDatum(stmt->queue)));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("resource queue %s does not exist",
stmt->queue)));
- /*
- * Remember the Oid
- */
+ /* Remember the Oid */
queueid = HeapTupleGetOid(tuple);
- /* MPP-6926: cannot DROP default queue */
- if (queueid == DEFAULTRESQUEUE_OID || queueid == ROOTRESQUEUE_OID)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot drop system resource queue \"%s\"",
- stmt->queue)));
-
/*
* Check to see if any roles are in this queue.
*/
@@ -190,7 +193,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
- errmsg("resource queue \"%s\" is used by at least one role",
+ errmsg("resource queue %s is used by at least one role",
stmt->queue)));
}
@@ -252,7 +255,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
@@ -272,9 +275,20 @@ void alterResourceQueue(AlterQueueStmt *stmt)
/* Permission check - only superuser can create queues. */
if (!superuser())
+ {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create resource queues")));
+ }
+
+ /* Cannot DROP default and root queue */
+ if ( strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter system resource queue %s",
+ stmt->queue)));
+ }
/*
* MPP-7960: We cannot run ALTER RESOURCE QUEUE inside a user transaction
@@ -282,7 +296,8 @@ void alterResourceQueue(AlterQueueStmt *stmt)
* resulting in "leaked", unreachable queues.
*/
- if (Gp_role == GP_ROLE_DISPATCH) {
+ if (Gp_role == GP_ROLE_DISPATCH)
+ {
PreventTransactionChain((void *) stmt, "ALTER RESOURCE QUEUE");
}
@@ -301,7 +316,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("resource queue %s does not exist",
stmt->queue)));
}
@@ -350,20 +365,20 @@ void alterResourceQueue(AlterQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply ALTER RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");
}
-#define VALIDATE_DDL_DUPLICATE_ATTRIBUTE(index, defel, targref) \
+#define VALID_DDL_DUP(index, defel, targref) \
if (strcmp((defel)->defname, RSQDDLAttrNames[(index)]) == 0) \
{ \
if ((targref) != NULL) \
{ \
ereport(ERROR, \
(errcode(ERRCODE_SYNTAX_ERROR), \
- errmsg("redundant option %s", \
+ errmsg("redundant attribute %s", \
RSQDDLAttrNames[(index)]))); \
} \
(targref) = (defel); \
@@ -372,13 +387,17 @@ void alterResourceQueue(AlterQueueStmt *stmt)
void validateDDLAttributeOptions(List *options)
{
- DefElem *dactivelimit = NULL;
- DefElem *dmemorylimit = NULL;
- DefElem *dcorelimit = NULL;
- DefElem *dvsegresquota = NULL;
- DefElem *dallocpolicy = NULL;
- DefElem *dresupperfactor = NULL;
- DefElem *dvsegupperlimit = NULL;
+ DefElem *dparent = NULL;
+ DefElem *dactivelimit = NULL;
+ DefElem *dmemorylimit = NULL;
+ DefElem *dcorelimit = NULL;
+ DefElem *dvsegresquota = NULL;
+ DefElem *dallocpolicy = NULL;
+ DefElem *dresovercommit = NULL;
+ DefElem *dnvsegupperlimit = NULL;
+ DefElem *dnvseglowerlimit = NULL;
+ DefElem *dnvsegupperlimitpseg = NULL;
+ DefElem *dnvseglowerlimitpseg = NULL;
Cost activelimit = INVALID_RES_LIMIT_THRESHOLD;
ListCell *option = NULL;
@@ -387,13 +406,17 @@ void validateDDLAttributeOptions(List *options)
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_ACTIVE_STATMENTS, defel, dactivelimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, defel, dmemorylimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, defel, dcorelimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, defel, dvsegresquota)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_ALLOCATION_POLICY, defel, dallocpolicy)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, defel, dresupperfactor)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, defel, dvsegupperlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_PARENT, defel, dparent)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_ACTIVE_STATMENTS, defel, dactivelimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, defel, dmemorylimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, defel, dcorelimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, defel, dvsegresquota)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_ALLOCATION_POLICY, defel, dallocpolicy)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, defel, dresovercommit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, defel, dnvsegupperlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, defel, dnvseglowerlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, defel, dnvsegupperlimitpseg)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, defel, dnvseglowerlimitpseg)
}
/* Perform range checks on the various thresholds.*/
@@ -440,31 +463,83 @@ void validateDDLAttributeOptions(List *options)
}
}
- /* The vsegment upper limit must be an integer and no less than -1. */
- if (dvsegupperlimit != NULL)
+ /*
+ * NVSEG_UPPER_LIMIT/NVSEG_LOWER_LIMIT has 0 as default value that means the
+ * setting is not effective, otherwise, it must be greater than 0.
+ */
+ int64_t nvsegupperlimit = -1;
+ int64_t nvseglowerlimit = -1;
+ if (dnvsegupperlimit != NULL)
{
- int64_t vsegupperlimit = defGetInt64(dvsegupperlimit);
- if (vsegupperlimit < DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT_N)
+ nvsegupperlimit = defGetInt64(dnvsegupperlimit);
+ if (nvsegupperlimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s cannot be less than %s",
- RSQDDLAttrNames[RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT],
- DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT)));
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT)));
}
}
+ if (dnvseglowerlimit != NULL)
+ {
+ nvseglowerlimit = defGetInt64(dnvseglowerlimit);
+ if (nvseglowerlimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT)));
+ }
+ }
+
+ /*
+ * NVSEG_UPPER_LIMIT_PERSEG/NVSEG_LOWER_LIMIT_PERSEG has 0 as default value
+ * that means the setting is not effective, otherwise, it must be greater
+ * than 0.
+ */
+ double nvsegupperlimitpseg = -1.0;
+ double nvseglowerlimitpseg = -1.0;
+ if (dnvsegupperlimitpseg != NULL)
+ {
+ nvsegupperlimitpseg = defGetNumeric(dnvsegupperlimitpseg);
+ if (nvsegupperlimitpseg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT)));
+ }
+ }
+
+ if (dnvseglowerlimitpseg != NULL)
+ {
+ nvseglowerlimitpseg = defGetNumeric(dnvseglowerlimitpseg);
+ if (nvseglowerlimitpseg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT)));
+ }
+ }
+
+
/* The resource upper factor must be no less than 1. */
- if( dresupperfactor != NULL)
+ if( dresovercommit != NULL)
{
- double resupperfactor = defGetNumeric(dresupperfactor);
- if (resupperfactor < MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT_N)
+ double resovercommit = defGetNumeric(dresovercommit);
+ if (resovercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s cannot be less than %s",
- RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR],
- MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT)));
+ RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
+ MINIMUM_RESQUEUE_OVERCOMMIT)));
}
}
}