You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/16 03:08:38 UTC
[4/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
HAWQ-25. Add resource queue new ddl statement implementation, refine partial GUC variable names, use libyarn supporting kerberos.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/a413a426
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/a413a426
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/a413a426
Branch: refs/heads/master
Commit: a413a4265f2ed4ecf7a4199248f57824d279427a
Parents: 7e6838f
Author: Yi Jin <yj...@pivotal.io>
Authored: Thu Oct 15 20:40:39 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Thu Oct 15 20:40:39 2015 +0800
----------------------------------------------------------------------
src/Makefile.global.in | 2 +-
src/backend/Makefile | 2 +-
src/backend/bootstrap/bootparse.y | 14 +-
src/backend/catalog/caql/Makefile | 1 -
src/backend/catalog/catalog.c | 40 +-
src/backend/catalog/toasting.c | 3 +-
src/backend/cdb/cdbdatalocality.c | 8 +
src/backend/cdb/cdbvars.c | 54 +-
src/backend/commands/Makefile | 2 +-
src/backend/commands/queue.c | 132 ---
src/backend/commands/user.c | 159 +---
src/backend/optimizer/plan/planner.c | 9 +
src/backend/postmaster/identity.c | 4 +-
.../communication/rmcomm_QD2RM.c | 297 +++----
.../communication/rmcomm_QE2RMSEG.c | 6 +-
.../communication/rmcomm_RM2RMSEG.c | 14 +-
.../communication/rmcomm_RMSEG2RM.c | 2 +-
src/backend/resourcemanager/errorcode.c | 2 +
.../include/communication/rmcomm_QD2RM.h | 17 +-
.../communication/rmcomm_QD_RM_Protocol.h | 10 +-
src/backend/resourcemanager/include/errorcode.h | 1 +
.../resourcemanager/include/resourcepool.h | 7 +
.../resourcemanager/include/resqueuemanager.h | 46 +-
src/backend/resourcemanager/include/rmcommon.h | 8 +-
src/backend/resourcemanager/requesthandler.c | 76 +-
.../resourcemanager/requesthandler_ddl.c | 350 ++++----
.../resourcebroker_LIBYARN_proc.c | 120 ++-
src/backend/resourcemanager/resourcemanager.c | 400 +++++----
.../resourcemanager/resourcemanager_RMSEG.c | 6 +-
src/backend/resourcemanager/resourcepool.c | 128 ++-
src/backend/resourcemanager/resqueuecommand.c | 181 ++--
src/backend/resourcemanager/resqueuemanager.c | 877 +++++++++++--------
.../resourcemanager/utils/network_utils.c | 3 -
src/backend/tcop/utility.c | 8 -
src/backend/utils/misc/etc/slaves.exclude | 1 -
src/backend/utils/misc/etc/yarn-client.xml | 288 ++++++
src/backend/utils/misc/guc.c | 155 ++--
src/bin/pg_dump/pg_dumpall.c | 2 +-
src/include/catalog/indexing.h | 32 +-
src/include/catalog/pg_resqueue.h | 389 +++-----
src/include/catalog/pg_type.h | 12 +-
src/include/catalog/tidycat.pl | 3 -
src/include/catalog/toasting.h | 2 +-
src/include/cdb/cdbvars.h | 22 +-
.../data/upgrade20/upg2_catupgrade_20.sql.in | 2 +-
.../regress/expected/parquet_compression.out | 4 +-
src/test/regress/sql/parquet_compression.sql | 4 +-
tools/bin/generate-greenplum-path.sh | 11 +-
tools/bin/gppylib/data/2.0.json | 326 ++-----
49 files changed, 2254 insertions(+), 1988 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/Makefile.global.in
----------------------------------------------------------------------
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index ad920f8..a619d74 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -533,7 +533,7 @@ ifeq ($(BLD_ARCH),osx106_x86)
LIBS := $(LIBS) -framework CoreServices -framework IOKit
endif
-LIBS := $(LIBS) -lyarn
+LIBS := $(LIBS) -lyarn -lkrb5
##########################################################################
#
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/Makefile b/src/backend/Makefile
index f648f3c..44f13a2 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -222,8 +222,8 @@ endif
${INSTALL_SCRIPT} -d ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/hawq-site.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/hdfs-client.xml ${sysconfdir}
+ ${INSTALL_DATA} $(srcdir)/utils/misc/etc/yarn-client.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/slaves ${sysconfdir}
- ${INSTALL_DATA} $(srcdir)/utils/misc/etc/slaves.exclude ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/template-hawq-site.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/gpcheck.cnf ${sysconfdir}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/bootstrap/bootparse.y
----------------------------------------------------------------------
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 1c7bbfd..9680912 100755
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -370,23 +370,11 @@ Boot_CreateStmt:
typid = PG_REMOTE_CREDENTIALS_RELTYPE_OID;
break;
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20150917 */
case ResQueueRelationId:
typid = PG_RESQUEUE_RELTYPE_OID;
break;
-
-/* relation id: 6059 - pg_resourcetype 20140807 */
- case ResourceTypeRelationId:
- typid = PG_RESOURCETYPE_RELTYPE_OID;
- break;
-
-
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
- case ResQueueCapabilityRelationId:
- typid = PG_RESQUEUECAPABILITY_RELTYPE_OID;
- break;
-
/* TIDYCAT_END_CODEGEN */
default:
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/caql/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/catalog/caql/Makefile b/src/backend/catalog/caql/Makefile
index 9e0cd7f..de513a3 100644
--- a/src/backend/catalog/caql/Makefile
+++ b/src/backend/catalog/caql/Makefile
@@ -62,7 +62,6 @@ CAQL_CQL_SRCS := $(addprefix $(top_srcdir)/src/backend/,\
commands/opclasscmds.c \
commands/operatorcmds.c \
commands/proclang.c \
- commands/queue.c \
commands/schemacmds.c \
commands/sequence.c \
commands/tablecmds.c \
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/catalog.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 20c80f4..ba65d88 100755
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -793,15 +793,11 @@ relationId == GpVerificationHistoryRelationId ||
relationId == AuthTimeConstraintRelationId ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == FileSystemRelationId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueRelationId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeRelationId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityRelationId ||
-
/* relation id: 5036 - gp_segment_configuration 20150204 */
relationId == GpSegmentConfigRelationId ||
+
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueRelationId ||
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
)
@@ -859,28 +855,16 @@ relationId == GpVerificationHistoryVertokenIndexId ||
relationId == FileSystemOidIndexId ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == FileSystemFsysnameIndexId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueOidIndexId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueRsqnameIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeOidIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeRestypidIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeResnameIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityOidIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityResqueueidIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityRestypidIndexId ||
-
/* relation id: 5036 - gp_segment_configuration 20150207 */
relationId == GpSegmentConfigRegistration_orderIndexId ||
/* relation id: 5036 - gp_segment_configuration 20150207 */
relationId == GpSegmentConfigRoleIndexId ||
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueOidIndexId ||
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueRsqnameIndexId ||
+
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
@@ -902,13 +886,13 @@ relationId == PgFileSpaceEntryToastIndex ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == PgFileSystemToastTable ||
relationId == PgFileSystemToastIndex ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == PgResQueueToastTable ||
-relationId == PgResQueueToastIndex ||
-
/* relation id: 5036 - gp_segment_configuration 20150204 */
relationId == GpSegmentConfigToastTable ||
relationId == GpSegmentConfigToastIndex ||
+
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == PgResQueueToastTable ||
+relationId == PgResQueueToastIndex ||
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/toasting.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 56bcc30..24b0846 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -162,8 +162,7 @@ BootstrapToastTable(char *relName, Oid toastOid, Oid toastIndexOid)
typid = PG_REMOTE_CREDENTIALS_TOAST_RELTYPE_OID;
break;
-
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20140917 */
case PgResQueueToastTable:
typid = PG_RESQUEUE_TOAST_RELTYPE_OID;
break;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index c46257a..b894629 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -3715,6 +3715,14 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query
}
/*allocate hash relation as a random relation*/
else{
+ MemoryContextSwitchTo(context->old_memorycontext);
+ CurrentRelType* relType = (CurrentRelType *) palloc(
+ sizeof(CurrentRelType));
+ relType->relid = rel_data->relid;
+ relType->isHash = false;
+ result->relsType = lappend(result->relsType, relType);
+ MemoryContextSwitchTo(context->datalocality_memorycontext);
+
result->forbid_optimizer = true;
allocate_random_relation(rel_data, &log_context,&idMap, &assignment_context, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index ee6ec26..d6a784d 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -291,36 +291,41 @@ bool debug_fake_segmentnum;
/* New HAWQ 2.0 basic GUCs. Some of these are duplicate variables, they are
* reserved to facilitate showing settings in hawq-site.xml. */
-char *master_addr_host; /* hawq.master.address.host */
-int master_addr_port; /* hawq.master.address.port */
-char *standby_addr_host; /* hawq.standby.address.host */
-int seg_addr_port; /* hawq.segment.address.port */
-char *dfs_url; /* hawq.dfs.url */
-char *master_directory; /* hawq.master.directory */
-char *seg_directory; /* hawq.segment.directory */
-bool rm_domain_comm_enable; /* enable domain socket for RM communication */
+char *master_addr_host;
+int master_addr_port;
+char *standby_addr_host;
+int seg_addr_port;
+char *dfs_url;
+char *master_directory;
+char *seg_directory;
/* HAWQ 2.0 resource manager GUCs */
-char *rm_seg_memory_use; /* hawq.resourcemanager.segment.limit.memory.use */
-double rm_seg_core_use; /* hawq.resourcemanager.segment.limit.core.use */
-int rm_master_addr_port; /* hawq.resourcemanager.master.address.port */
-int rm_master_addr_domain_port;
-int rm_seg_addr_port;
-
-char *rm_grm_server_type; /* hawq.resourcemanager.server.type */
-char *rm_grm_yarn_rm_addr; /* hawq.resourcemanager.yarn.resourcemanager.address*/
-char *rm_grm_yarn_sched_addr; /* hawq.resourcemanager.yarn.resourcemanager.scheduler.address*/
-char *rm_grm_yarn_queue; /* hawq.resourcemanager.yarn.queue */
-char *rm_grm_yarn_app_name; /* hawq.resourcemanager.yarn.application.name*/
+int rm_master_port;
+int rm_segment_port;
+int rm_master_domain_port;
+
+int rm_nvseg_perquery_limit;
+int rm_nvseg_perquery_perseg_limit;
+int rm_nslice_perseg_limit;
+
+char *rm_seg_memory_use;
+double rm_seg_core_use;
+
+
+
+char *rm_global_rm_type;
+char *rm_grm_yarn_rm_addr;
+char *rm_grm_yarn_sched_addr;
+char *rm_grm_yarn_queue;
+char *rm_grm_yarn_app_name;
int rm_grm_breath_return_percentage;
-int rm_query_vseg_num_limit;
-int rm_query_vseg_num_per_seg_limit;
-int rm_slice_num_per_seg_limit;
+
int rm_seg_container_default_waterlevel;
bool rm_force_fifo_queue;
-int rm_resource_noaction_timeout; /* How many seconds to wait before expiring
+bool rm_session_lease_heartbeat_enable;
+int rm_session_lease_timeout; /* How many seconds to wait before expiring
allocated resource. */
int rm_query_resource_noresource_timeout; /* How may seconds to wait before
expiring queuing query resource
@@ -330,6 +335,9 @@ int rm_resource_timeout; /* How many seconds to wait before returning
int rm_resource_heartbeat_interval; /* How many seconds to wait before sending
another heart-beat to resource manager. */
+int rm_tolerate_nseg_limit;
+int rm_nvseg_variance_among_seg_limit;
+
char *rm_resourcepool_test_filename;
bool rm_enforce_cpu_enable; /* hawq_resourceenforcer_cpu_enable */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 09666a1..c59098d 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -17,7 +17,7 @@ OBJS = aggregatecmds.o alter.o analyze.o analyzeutils.o async.o cluster.o commen
conversioncmds.o copy.o \
dbcommands.o define.o explain.o extprotocolcmds.o filespace.o filesystemcmds.o foreigncmds.o functioncmds.o \
indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
- portalcmds.o prepare.o proclang.o queue.o \
+ portalcmds.o prepare.o proclang.o \
schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/queue.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/queue.c b/src/backend/commands/queue.c
deleted file mode 100644
index 0240581..0000000
--- a/src/backend/commands/queue.c
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * queue.c
- * Commands for manipulating resource queues.
- *
- * Copyright (c) 2006-2010, Greenplum inc.
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/genam.h"
-#include "access/heapam.h"
-#include "catalog/catquery.h"
-#include "catalog/dependency.h"
-#include "catalog/heap.h"
-#include "catalog/indexing.h"
-#include "catalog/pg_authid.h"
-#include "catalog/pg_resqueue.h"
-#include "nodes/makefuncs.h"
-#include "cdb/cdbvars.h"
-#include "cdb/cdbdisp.h"
-#include "commands/comment.h"
-#include "commands/defrem.h"
-#include "commands/queue.h"
-#include "libpq/crypt.h"
-#include "miscadmin.h"
-#include "utils/acl.h"
-#include "utils/builtins.h"
-#include "utils/flatfiles.h"
-#include "utils/fmgroids.h"
-#include "utils/formatting.h"
-#include "utils/guc.h"
-#include "utils/lsyscache.h"
-#include "executor/execdesc.h"
-#include "utils/resscheduler.h"
-#include "utils/syscache.h"
-#include "cdb/memquota.h"
-#include "utils/guc_tables.h"
-
-#define INVALID_RES_LIMIT_STRING "-1"
-
-/**
- * Establish a lower bound on what memory limit may be set on a queue.
- */
-#define MIN_RESOURCEQUEUE_MEMORY_LIMIT_KB (10 * 1024L)
-
-/* MPP-6923: */
-List *
-GetResqueueCapabilityEntry(Oid queueid)
-{
- List *elems = NIL;
- HeapTuple tuple;
- cqContext *pcqCtx;
- cqContext cqc;
- Relation rel;
- TupleDesc tupdesc;
-
- rel = heap_open(ResQueueCapabilityRelationId, AccessShareLock);
-
- tupdesc = RelationGetDescr(rel);
-
- pcqCtx = caql_beginscan(
- caql_addrel(cqclr(&cqc), rel),
- cql("SELECT * FROM pg_resqueuecapability"
- " WHERE resqueueid = :1 ",
- ObjectIdGetDatum(queueid)));
-
- while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
- {
- if (HeapTupleIsValid(tuple))
- {
- List *pentry = NIL;
- int resTypeInt = 0;
- text *resSet_text = NULL;
- Datum resSet_datum;
- char *resSetting = NULL;
- bool isnull = false;
-
- resTypeInt =
- ((Form_pg_resqueuecapability) GETSTRUCT(tuple))->restypid;
-
- resSet_datum = heap_getattr(tuple,
- Anum_pg_resqueuecapability_ressetting,
- tupdesc,
- &isnull);
- Assert(!isnull);
- resSet_text = DatumGetTextP(resSet_datum);
- resSetting = DatumGetCString(DirectFunctionCall1(textout,
- PointerGetDatum(resSet_text)));
-
- pentry = list_make2(
- makeInteger(resTypeInt),
- makeString(resSetting));
-
- /* list of lists */
- elems = lappend(elems, pentry);
- }
- }
- caql_endscan(pcqCtx);
-
- heap_close(rel, AccessShareLock);
-
- return (elems);
-} /* end GetResqueueCapabilityEntry */
-
-/**
- * Given a queue id, return its name
- */
-char *GetResqueueName(Oid resqueueOid)
-{
- int fetchCount;
- char *result = NULL;
-
- result = caql_getcstring_plus(
- NULL,
- &fetchCount,
- NULL,
- cql("SELECT rsqname FROM pg_resqueue"
- " WHERE oid = :1 ",
- ObjectIdGetDatum(resqueueOid)));
-
- /* If we cannot find a resource queue id for any reason */
- if (!fetchCount)
- result = pstrdup("Unknown");
-
- return result;
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/user.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index 32b7417..a673b75 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -146,7 +146,7 @@ static bool resourceQueueIsBranch(Oid queueid)
Assert(tuple != NULL);
- status = heap_getattr(tuple, Anum_pg_resqueue_rsq_status, RelationGetDescr(pg_resqueue_rel), &isNull);
+ status = heap_getattr(tuple, Anum_pg_resqueue_status, RelationGetDescr(pg_resqueue_rel), &isNull);
if (!isNull && strncmp(TextDatumGetCString(status), "branch", strlen("branch")) == 0)
res = true;
@@ -206,9 +206,8 @@ CreateRole(CreateRoleStmt *stmt)
cqContext *pcqCtx;
Oid queueid = InvalidOid;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
/* The defaults can vary depending on the original statement type */
switch (stmt->stmt_type)
@@ -449,22 +448,10 @@ CreateRole(CreateRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
-
- if (res != FUNC_RETURN_OK) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply CREATE ROLE. "
- "Because fail to communicate with resource manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying CREATE ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/*
@@ -554,31 +541,34 @@ CreateRole(CreateRoleStmt *stmt)
if (resqueue)
{
- if (strcmp(resqueue, "none") == 0) {
+ if (strcmp(resqueue, "none") == 0)
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
- errmsg("resource queue name \"%s\" is reserved",
+ errmsg("Resource queue name \"%s\" is reserved",
resqueue), errOmitLocation(true)));
}
queueid = GetResQueueIdForName(resqueue);
- if (queueid == InvalidOid) {
+ if (queueid == InvalidOid)
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("Resource queue \"%s\" does not exist",
resqueue), errOmitLocation(true)));
}
- if(resourceQueueIsBranch(queueid)) {
+ if(resourceQueueIsBranch(queueid))
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("can't assign a branch resource queue \"%s\" to role",
+ errmsg("Can't assign a branch resource queue \"%s\" to role",
resqueue), errOmitLocation(true)));
}
@@ -614,7 +604,6 @@ CreateRole(CreateRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_CREATE,
issuper,
stmt->role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
}
@@ -627,19 +616,13 @@ CreateRole(CreateRoleStmt *stmt)
if (resqueue && queueid != InvalidOid)
{
- if (res != FUNC_RETURN_OK) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply CREATE ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply CREATE ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply CREATE ROLE because of %s", errorbuf)));
}
}
@@ -774,11 +757,10 @@ AlterRole(AlterRoleStmt *stmt)
cqContext cqc;
cqContext *pcqCtx;
- Oid queueid = InvalidOid;
+ Oid queueid = InvalidOid;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
numopts = list_length(stmt->options);
@@ -977,7 +959,8 @@ AlterRole(AlterRoleStmt *stmt)
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -989,21 +972,10 @@ AlterRole(AlterRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
- if (res != FUNC_RETURN_OK) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply ALTER ROLE. "
- "Because fail to communicate with resource manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying ALTER ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/*
@@ -1278,7 +1250,6 @@ AlterRole(AlterRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_ALTER,
issuper,
stmt->role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
}
@@ -1291,19 +1262,14 @@ AlterRole(AlterRoleStmt *stmt)
if (resqueue && queueid != InvalidOid)
{
- if ( res != FUNC_RETURN_OK ) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply ALTER ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
+
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply ALTER ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply ALTER ROLE because of %s", errorbuf)));
}
}
@@ -1541,9 +1507,8 @@ DropRole(DropRoleStmt *stmt)
pg_auth_members_rel;
ListCell *item;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
if (!have_createrole_privilege())
ereport(ERROR,
@@ -1662,22 +1627,10 @@ DropRole(DropRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
- if ( res != FUNC_RETURN_OK ) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply DROP ROLE. "
- "Because fail to communicate with resource manager.")));
- }
-
- if ( errorcode != FUNC_RETURN_OK ) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying DROP ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/* Notify resource manager to drop role. */
@@ -1687,7 +1640,6 @@ DropRole(DropRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_DROP,
0, // not used when drop role
(char*)role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
/* We always unregister connection. */
@@ -1696,19 +1648,13 @@ DropRole(DropRoleStmt *stmt)
/* We always release resource context. */
releaseResourceContextWithErrorReport(resourceid);
- if ( res != FUNC_RETURN_OK ) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply DROP ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply DROP ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply DROP ROLE because of %s", errorbuf)));
}
/*
@@ -2145,16 +2091,7 @@ AddRoleMems(const char *rolename, Oid roleid,
rolename)));
}
- /*
- * The role membership grantor of record has little significance at
- * present. Nonetheless, inasmuch as users might look to it for a crude
- * audit trail, let only superusers impute the grant to a third party.
- *
- * Before lifting this restriction, give the member == role case of
- * is_admin_of_role() a fresh look. Ensure that the current role cannot
- * use an explicit grantor specification to take advantage of the session
- * user's self-admin right.
- */
+ /* XXX not sure about this check */
if (grantorId != GetUserId() && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/optimizer/plan/planner.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index e0162a9..ae81adf 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -328,6 +328,8 @@ planner(Query *parse, int cursorOptions,
}
}
+ int optimizer_segments_saved_value = optimizer_segments;
+
PG_TRY();
{
if (resourceNegotiateDone)
@@ -347,6 +349,11 @@ planner(Query *parse, int cursorOptions,
}
START_MEMORY_ACCOUNT(MemoryAccounting_CreateAccount(0, MEMORY_OWNER_TYPE_Optimizer));
{
+ if (optimizer_segments == 0) // value not set by user
+ {
+ optimizer_segments = gp_segments_for_planner;
+ }
+
result = optimize_query(parse, boundParams);
if (ppResult->stmt && ppResult->stmt->intoPolicy
&& result && result->intoPolicy)
@@ -354,6 +361,7 @@ planner(Query *parse, int cursorOptions,
result->intoPolicy->bucketnum =
ppResult->stmt->intoPolicy->bucketnum;
}
+ optimizer_segments = optimizer_segments_saved_value;
}
END_MEMORY_ACCOUNT();
@@ -405,6 +413,7 @@ planner(Query *parse, int cursorOptions,
*/
resourceNegotiateDone = false;
plannerLevel = 0;
+ optimizer_segments = optimizer_segments_saved_value;
if (savedQueryResource)
{
gp_segments_for_planner = list_length(savedQueryResource->segments);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/postmaster/identity.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/identity.c b/src/backend/postmaster/identity.c
index 737c31a..7d1bcf0 100644
--- a/src/backend/postmaster/identity.c
+++ b/src/backend/postmaster/identity.c
@@ -376,7 +376,7 @@ SetupProcessIdentity(const char *str)
/* If QE is under one segment. */
if ( GetQEIndex() != -1 ) {
GetLocalTmpDirFromRM("127.0.0.1",//DRMGlobalInstance->SocketLocalHostName.Str,
- rm_seg_addr_port,
+ rm_segment_port,
gp_session_id,
gp_command_count,
GetQEIndex());
@@ -384,7 +384,7 @@ SetupProcessIdentity(const char *str)
/* QE is under master. */
else {
GetLocalTmpDirFromRM("127.0.0.1",//DRMGlobalInstance->SocketLocalHostName.Str,
- rm_master_addr_port,
+ rm_master_port,
gp_session_id,
gp_command_count,
GetQEIndex());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index e8583ce..10189e0 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -210,7 +210,7 @@ void initializeQD2RMComm(void)
}
/* Get UNIX domain socket file. */
- UNIXSOCK_PATH(QD2RM_SocketFile, rm_master_addr_domain_port, UnixSocketDir);
+ UNIXSOCK_PATH(QD2RM_SocketFile, rm_master_domain_port, UnixSocketDir);
/* Initialize global variables for maintaining a list of resource sets. */
QD2RM_ResourceSets = rm_palloc0(QD2RM_CommContext,
@@ -232,13 +232,16 @@ void initializeQD2RMComm(void)
}
/* Start resource heart-beat thread. */
- if ( pthread_create(&ResourceHeartBeatThreadHandle,
- NULL,
- generateResourceRefreshHeartBeat,
- NULL) != 0)
+ if ( rm_session_lease_heartbeat_enable )
{
- elog(ERROR, "Fail to create background thread for communication with "
- "resource manager.");
+ if ( pthread_create(&ResourceHeartBeatThreadHandle,
+ NULL,
+ generateResourceRefreshHeartBeat,
+ NULL) != 0)
+ {
+ elog(ERROR, "Fail to create background thread for communication with "
+ "resource manager.");
+ }
}
initializeMessageHandlers();
@@ -530,6 +533,7 @@ int unregisterConnectionInRM(int index,
(RPCResponseHeadUnregisterConnectionInRM)(recvbuffer->Buffer);
if ( response->Result != FUNC_RETURN_OK )
{
+ res = response->Result;
snprintf(errorbuf, errorbufsize,
"Fail to unregister in HAWQ resource manager because of "
"remote error %s.",
@@ -540,7 +544,7 @@ int unregisterConnectionInRM(int index,
QD2RM_ResourceSets[index]->QD_Conn_ID);
QD2RM_ResourceSets[index]->QD_Conn_ID = INVALID_CONNID;
- return FUNC_RETURN_OK;
+ return res;
}
void unregisterConnectionInRMWithErrorReport(int index)
@@ -598,8 +602,8 @@ int acquireResourceFromRM(int index,
requesthead.MaxSegCountFix = max_seg_count_fix;
requesthead.MinSegCountFix = min_seg_count_fix;
requesthead.SliceSize = slice_size;
- requesthead.VSegLimitPerSeg = rm_query_vseg_num_per_seg_limit;
- requesthead.VSegLimit = rm_query_vseg_num_limit;
+ requesthead.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
+ requesthead.VSegLimit = rm_nvseg_perquery_limit;
requesthead.Reserved = 0;
requesthead.IOBytes = iobytes;
@@ -781,7 +785,9 @@ int returnResource(int index,
/* Parse response. */
RPCResponseHeadReturnResource response =
(RPCResponseHeadReturnResource)(recvbuffer->Buffer);
- if ( response->Result != FUNC_RETURN_OK ) {
+ if ( response->Result != FUNC_RETURN_OK )
+ {
+ res = response->Result;
snprintf(errorbuf, errorbufsize,
"Fail to return resource to HAWQ resource manager because of "
"remote error %s.",
@@ -845,9 +851,7 @@ int manipulateResourceQueue(int index,
recvbuffer);
if ( res != FUNC_RETURN_OK )
{
- snprintf(errorbuf, errorbufsize,
- "Fail to manipulate resource queue because of RPC error %s.",
- getErrorCodeExplain(res));
+ snprintf(errorbuf, errorbufsize, "%s", getErrorCodeExplain(res));
return res;
}
@@ -856,27 +860,29 @@ int manipulateResourceQueue(int index,
(RPCResponseHeadManipulateResQueue)(recvbuffer->Buffer);
/* CASE 1. The response contains error message. */
- if ( response->Result != FUNC_RETURN_OK ) {
-
+ if ( response->Result != FUNC_RETURN_OK )
+ {
RPCResponseHeadManipulateResQueueERROR error =
(RPCResponseHeadManipulateResQueueERROR)(recvbuffer->Buffer);
- elog(LOG, "Fail to manipulate resource queue because %s",
- error->ErrorText);
+ elog(WARNING, "Fail to manipulate resource queue because %s",
+ error->ErrorText);
snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
}
+
+ elog(DEBUG3, "Manipulated resource queue and got result %d", response->Result);
+
return response->Result;
}
-int manipulateRoleForResourceQueue (int index,
- Oid roleid,
- Oid queueid,
- uint16_t action,
- uint8_t isSuperUser,
- char *rolename,
- int *errorcode,
- char *errorbuf,
- int errorbufsize)
+int manipulateRoleForResourceQueue (int index,
+ Oid roleid,
+ Oid queueid,
+ uint16_t action,
+ uint8_t isSuperUser,
+ char *rolename,
+ char *errorbuf,
+ int errorbufsize)
{
initializeQD2RMComm();
@@ -902,65 +908,51 @@ int manipulateRoleForResourceQueue (int index,
request.isSuperUser = isSuperUser;
request.Action = action;
if (strlen(rolename) < sizeof(request.Name))
+ {
strncpy(request.Name, rolename, strlen(rolename));
- else {
- snprintf(errorbuf, errorbufsize, "Invalid role name.");
- *errorcode = res;
- return res;
+ }
+ else
+ {
+ elog(WARNING, "Resource manager finds in valid role name %s.", rolename);
+ snprintf(errorbuf, errorbufsize, "invalid role name %s.", rolename);
+ return RESQUEMGR_NO_USERID;
}
- elog(RMLOG, "HAWQ RM: manipulateRoleForResourceQueue "
- "role oid:%d, queueID:%d, isSuper:%d, roleName:%s, action:%d",
- request.RoleOID, request.QueueOID, request.isSuperUser,
- request.Name, request.Action);
+ elog(DEBUG3, "Resource manager (manipulateRoleForResourceQueue) "
+ "role oid:%d, queueID:%d, isSuper:%d, roleName:%s, action:%d",
+ request.RoleOID, request.QueueOID, request.isSuperUser,
+ request.Name, request.Action);
appendSMBVar(sendbuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendbuffer->Buffer,
- sendbuffer->Cursor + 1,
- REQUEST_QD_DDL_MANIPULATEROLE,
- RESPONSE_QD_DDL_MANIPULATEROLE,
- recvbuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendbuffer->Buffer,
- sendbuffer->Cursor + 1,
- REQUEST_QD_DDL_MANIPULATEROLE,
- RESPONSE_QD_DDL_MANIPULATEROLE,
- recvbuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
- snprintf(errorbuf, errorbufsize,
- "Fail to get response from resource manager RPC.");
- *errorcode = res;
+ res = callSyncRPCToRM(sendbuffer->Buffer,
+ sendbuffer->Cursor + 1,
+ REQUEST_QD_DDL_MANIPULATEROLE,
+ RESPONSE_QD_DDL_MANIPULATEROLE,
+ recvbuffer);
+
+ if ( res != FUNC_RETURN_OK )
+ {
+ snprintf(errorbuf, errorbufsize, "%s", getErrorCodeExplain(res));
return res;
}
/* Start parsing response. */
- RPCResponseHeadManipulateRole response =
- (RPCResponseHeadManipulateRole)(recvbuffer->Buffer);
+ RPCResponseHeadManipulateRole response = (RPCResponseHeadManipulateRole)
+ (recvbuffer->Buffer);
/* The response contains error message. */
- if ( response->Result != FUNC_RETURN_OK ) {
-
+ if ( response->Result != FUNC_RETURN_OK )
+ {
RPCResponseHeadManipulateRoleERROR error =
(RPCResponseHeadManipulateRoleERROR)(recvbuffer->Buffer);
- elog(RMLOG, "HAWQ RM :: Fail to manipulate role. %s",
- error->ErrorText);
+ elog(WARNING, "Resource manager failed to manipulate role %s. %s",
+ rolename,
+ error->ErrorText);
snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
- *errorcode = error->Result;
- return FUNC_RETURN_OK;
}
-
- *errorcode = FUNC_RETURN_OK;
- return res;
+ return response->Result;
}
void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer,
@@ -972,8 +964,7 @@ void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer,
Assert( sendbuffer != NULL );
Assert( connid != 0XFFFFFFFF );
Assert( queuename != NULL );
- Assert( action >= MANIPULATE_RESQUEUE_CREATE &&
- action <= MANIPULATE_RESQUEUE_DROP );
+ Assert( action >= MANIPULATE_RESQUEUE_CREATE && action <= MANIPULATE_RESQUEUE_DROP );
uint16_t withlength = 0;
bool nowIsWithOption = false;
@@ -1069,26 +1060,14 @@ void sendFailedNodeToResourceManager(int hostNum, char **pghost) {
elog(LOG, "HAWQ RM :: QD sends %d failed host(s) to resource manager.",
hostNum);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_SEGMENT_ISDOWN,
- RESPONSE_QD_SEGMENT_ISDOWN,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_SEGMENT_ISDOWN,
- RESPONSE_QD_SEGMENT_ISDOWN,
- &recvBuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_SEGMENT_ISDOWN,
+ RESPONSE_QD_SEGMENT_ISDOWN,
+ &recvBuffer);
+
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Fail to get response from resource manager RPC. %d", res);
goto exit;
}
@@ -1114,25 +1093,11 @@ int getLocalTmpDirFromMasterRM()
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_TMPDIR,
- RESPONSE_QD_TMPDIR,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_TMPDIR,
- RESPONSE_QD_TMPDIR,
- &recvBuffer);
- }
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_TMPDIR,
+ RESPONSE_QD_TMPDIR,
+ &recvBuffer);
if ( res != FUNC_RETURN_OK )
{
elog(ERROR, "getLocalTmpDirFromMasterRM fail");
@@ -1185,35 +1150,23 @@ int acquireResourceQuotaFromRM(int64_t user_oid,
request.UseridOid = user_oid;
request.MaxSegCountFix = max_seg_count_fix;
request.MinSegCountFix = min_seg_count_fix;
- request.VSegLimitPerSeg = rm_query_vseg_num_per_seg_limit;
- request.VSegLimit = rm_query_vseg_num_limit;
+ request.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
+ request.VSegLimit = rm_nvseg_perquery_limit;
appendSMBVar(&sendBuffer, request);
- elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, %d preferred virtual segments by user "INT64_FORMAT,
+ elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, "
+ "%d preferred virtual segments by user "INT64_FORMAT,
max_seg_count_fix,
min_seg_count_fix,
user_oid);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
- RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
- RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
- &recvBuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
+ RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
+ {
snprintf(errorbuf, errorbufsize,
"Fail to get response from resource manager RPC.");
*errorcode = res;
@@ -1245,7 +1198,6 @@ exit:
void *generateResourceRefreshHeartBeat(void *arg)
{
- static char dfilename[256];
static char messagehead[16] = {'M' ,'S' ,'G' ,'S' ,'T' ,'A' ,'R' ,'T' ,
'\0','\0','\0','\0','\0','\0','\0','\0'};
static char messagetail[8] = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' };
@@ -1292,11 +1244,12 @@ void *generateResourceRefreshHeartBeat(void *arg)
/* Build final request content and send out. */
appendSelfMaintainBufferTill64bitAligned(&contbuffer);
- if ( sendcontent ) {
+ if ( sendcontent )
+ {
int fd = -1;
- int res = connectToServerDomain(QD2RM_SocketFile, 0, &fd, 1, dfilename);
- if ( res == FUNC_RETURN_OK ) {
-
+ int res = connectToServerRemote(master_addr_host, rm_master_port, &fd);
+ if ( res == FUNC_RETURN_OK )
+ {
RMMessageHead phead = (RMMessageHead)messagehead;
RMMessageTail ptail = (RMMessageTail)messagetail;
phead->Mark1 = 0;
@@ -1322,7 +1275,7 @@ void *generateResourceRefreshHeartBeat(void *arg)
write_log("generateResourceRefreshHeartBeat send error (errno %d)", errno);
}
}
- closeConnectionDomain(&fd, dfilename);
+ closeConnectionRemote(&fd);
}
pg_usleep(rm_resource_heartbeat_interval * 1000000);
}
@@ -1362,24 +1315,17 @@ Datum pg_resqueue_status(PG_FUNCTION_ARGS)
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_DUMP_RESQUEUE_STATUS,
+ RESPONSE_QD_DUMP_RESQUEUE_STATUS,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
{
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_RESQUEUE_STATUS,
- RESPONSE_QD_DUMP_RESQUEUE_STATUS,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_RESQUEUE_STATUS,
- RESPONSE_QD_DUMP_RESQUEUE_STATUS,
- &recvBuffer);
+ destroySelfMaintainBuffer(&sendBuffer);
+ destroySelfMaintainBuffer(&recvBuffer);
+ funcctx->max_calls = 0;
+ SRF_RETURN_DONE(funcctx);
}
RPCResponseResQueueStatus response = (RPCResponseResQueueStatus)(recvBuffer.Buffer);
@@ -1507,25 +1453,15 @@ void dumpResourceManagerStatus(uint32_t type, const char *dump_file)
strncpy(request.dump_file, dump_file, sizeof(request.dump_file) - 1);
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_STATUS,
- RESPONSE_QD_DUMP_STATUS,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_STATUS,
- RESPONSE_QD_DUMP_STATUS,
- &recvBuffer);
- }
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_DUMP_STATUS,
+ RESPONSE_QD_DUMP_STATUS,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
+ {
+ goto exit;
+ }
RPCResponseDumpStatus response = (RPCResponseDumpStatus)(recvBuffer.Buffer);
@@ -2494,23 +2430,20 @@ int callSyncRPCToRM(const char *sendbuff,
uint16_t exprecvmsgid,
SelfMaintainBuffer recvsmb)
{
- if (rm_domain_comm_enable)
- {
+#ifdef ENABLE_DOMAINSERVER
return callSyncRPCDomain(QD2RM_SocketFile,
sendbuff,
sendbuffsize,
sendmsgid,
exprecvmsgid,
recvsmb);
- }
- else
- {
+#else
return callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
+ rm_master_port,
sendbuff,
sendbuffsize,
sendmsgid,
exprecvmsgid,
recvsmb);
- }
+#endif
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
index 9b0ae17..874b53d 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
@@ -28,7 +28,7 @@ MoveToCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
@@ -71,7 +71,7 @@ MoveOutCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
@@ -116,7 +116,7 @@ SetWeightCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index d9d82ca..c87694a 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -142,7 +142,7 @@ int sendRUAlive(char *seghostname)
/* Connect to HAWQ RM server */
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -333,7 +333,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -343,7 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
elog(LOG, "Resource manager failed to set connection to segment host %s "
"on port %d to increase memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
processContainersAfterIncreaseMemoryQuota(containerset, false);
freeGRMContainerSet(newctns);
rm_pfree(AsyncCommContext, context);
@@ -354,7 +354,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
elog(DEBUG3, "Resource manager succeeded set connection to segment host %s "
"on port %d to increase memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
}
buildMessageToCommBuffer(commbuffer,
@@ -504,7 +504,7 @@ int decreaseMemoryQuota(char *seghostname,
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -514,7 +514,7 @@ int decreaseMemoryQuota(char *seghostname,
elog(LOG, "Resource manager failed to set connection to segment host %s "
"on port %d to decrease memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
processContainersAfterDecreaseMemoryQuota(ctns, false);
freeGRMContainerSet(ctns);
rm_pfree(AsyncCommContext, context);
@@ -525,7 +525,7 @@ int decreaseMemoryQuota(char *seghostname,
elog(DEBUG3, "Resource manager succeeded set connection to segment host %s "
"on port %d to decrease memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
}
buildMessageToCommBuffer(commbuffer,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 9a35cce..4adeb44 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -129,7 +129,7 @@ int sendIMAlive(int *errorcode,
DRMGlobalInstance->SendToStandby?
standby_addr_host:
master_addr_host,
- rm_master_addr_port,
+ rm_master_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/errorcode.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/errorcode.c b/src/backend/resourcemanager/errorcode.c
index 81ec92b..49c1a1e 100644
--- a/src/backend/resourcemanager/errorcode.c
+++ b/src/backend/resourcemanager/errorcode.c
@@ -12,6 +12,8 @@ ErrorDetailData ErrorDetailsPreset[] = {
{RESQUEMGR_NOCLUSTER_TIMEOUT, "no available cluster to run query"},
{RESQUEMGR_NORESOURCE_TIMEOUT, "no available resource to run query"},
{RESQUEMGR_TOO_MANY_FIXED_SEGNUM, "expecting too many virtual segments"},
+ {REQUESTHANDLER_WRONG_CONNSTAT, "that resource context maybe recycled due to timeout"},
+ {CONNTRACK_NO_CONNID, "that resource context does not exist"},
{-1, ""}
};
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h b/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
index 524399a..bdb9400 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
@@ -136,15 +136,14 @@ int manipulateResourceQueue(int index,
char *errorbuf,
int errorbufsize);
-int manipulateRoleForResourceQueue (int index,
- Oid roleid,
- Oid queueid,
- uint16_t action,
- uint8_t isSuperUser,
- char *rolename,
- int *errorcode,
- char *errorbuf,
- int errorbufsize);
+int manipulateRoleForResourceQueue (int index,
+ Oid roleid,
+ Oid queueid,
+ uint16_t action,
+ uint8_t isSuperUser,
+ char *rolename,
+ char *errorbuf,
+ int errorbufsize);
void SendResourceRefreshHeartBeat(void);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
index 9d063d9..17d397b 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
@@ -107,15 +107,13 @@ enum CatResQueueAction_Action_Enum {
};
RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseHeadManipulateResQueueERROR)
- uint32_t Result;
- uint32_t Reserved;
- char ErrorText[1];
+ RPCResponseHeadManipulateResQueueData Result;
+ char ErrorText[1];
RPC_PROTOCOL_STRUCT_END(RPCResponseHeadManipulateResQueueERROR)
RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseHeadManipulateRoleERROR)
- uint32_t Result;
- uint32_t Reserved;
- char ErrorText[1];
+ RPCResponseHeadManipulateRoleData Result;
+ char ErrorText[1];
RPC_PROTOCOL_STRUCT_END(RPCResponseHeadManipulateRoleERROR)
/*******************************************************************************
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/errorcode.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/errorcode.h b/src/backend/resourcemanager/include/errorcode.h
index 5eb627d..8c86011 100644
--- a/src/backend/resourcemanager/include/errorcode.h
+++ b/src/backend/resourcemanager/include/errorcode.h
@@ -74,6 +74,7 @@ enum DRM_ERROR_CODE {
RESQUEMGR_DEADLOCK_DETECTED,
RESQUEMGR_NOCLUSTER_TIMEOUT,
RESQUEMGR_NORESOURCE_TIMEOUT,
+ RESQUEMGR_WRONG_RES_QUOTA_EXP,
REQUESTHANDLER_START_TAG = 200,
REQUESTHANDLER_WAIT_RESOURCE,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index c502429..66c334d 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -460,6 +460,9 @@ struct ResourcePoolData {
*/
ALLOC_RES_FROM_RESPOOL_FUNC allocateResFuncs[RESOURCEPOOL_MAX_ALLOC_POLICY_SIZE];
+ /* Slaves file content. */
+ int64_t SlavesFileTimestamp;
+ int SlavesHostCount;
};
typedef struct ResourcePoolData *ResourcePool;
@@ -601,6 +604,9 @@ typedef struct RB_GRMContainerStatData *RB_GRMContainerStat;
void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size);
int getClusterGRMContainerSize(void);
+
+void checkSlavesFile(void);
+
/*
*------------------------------------------------------------------------------
* gp_segment_configuration catalog operating APIs.
@@ -617,6 +623,7 @@ void add_segment_config_row(int32_t id,
char *address,
uint32_t port,
char role);
+
/*
* In resource pool, segment's id starts from 0, however in gp_segment_configuration table,
* segment registration order starts from 1(0 is for master, -1 is for standby).
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 0ba03f7..38de157 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -38,14 +38,17 @@ enum RESOURCE_QUEUE_DDL_ATTR_INDEX
RSQ_DDL_ATTR_ACTIVE_STATMENTS,
RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER,
RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER,
- RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA,
+ RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA,
RSQ_DDL_ATTR_ALLOCATION_POLICY,
- RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR,
- RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT,
+ RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR,
+ RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT,
+ RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT,
+ RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG,
+ RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG,
RSQ_DDL_ATTR_COUNT
};
-extern char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT][RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX];
+extern char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT][RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX+1];
/*
* The attributes for expressing one complete resource queue definition.
@@ -57,10 +60,13 @@ enum RESOURCE_QUEUE_TABLE_ATTR_INDEX {
RSQ_TBL_ATTR_ACTIVE_STATMENTS,
RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER,
RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER,
- RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA,
+ RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA,
RSQ_TBL_ATTR_ALLOCATION_POLICY,
- RSQ_TBL_ATTR_RESORUCE_UPPER_FACTOR,
- RSQ_TBL_ATTR_VSEGMENT_UPPER_LIMIT,
+ RSQ_TBL_ATTR_RESORUCE_OVERCOMMIT_FACTOR,
+ RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT,
+ RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT,
+ RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG,
+ RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG,
/* The attributes automatically generated. */
RSQ_TBL_ATTR_OID,
@@ -73,17 +79,6 @@ enum RESOURCE_QUEUE_TABLE_ATTR_INDEX {
};
/*
- * The possible resource allocation policies.
- */
-enum RESOURCE_QUEUE_ALLOCATION_POLICY_INDEX {
- RSQ_ALLOCATION_POLICY_EVEN = 0,
- RSQ_ALLOCATION_POLICY_FIFO,
-
- RSQ_ALLOCATION_POLICY_COUNT
-};
-
-
-/*
* The attributes for expressing one complete role/user definition.
*/
enum USER_ATTR_INDEX {
@@ -114,8 +109,11 @@ struct DynResourceQueueData {
int32_t SegResourceQuotaMemoryMB;/* Segment resource quota */
int32_t Reserved2;
- double ResourceUpperFactor;
- int32_t VSegUpperLimit; /* vseg upper limit. */
+ int32_t NVSegUpperLimit; /* vseg upper limit. */
+ int32_t NVSegLowerLimit; /* vseg upper limit. */
+ double NVSegUpperLimitPerSeg; /* vseg upper limit per seg. */
+ double NVSegLowerLimitPerSeg; /* vseg lower limit per seg. */
+ double ResourceOvercommit;
int8_t AllocatePolicy; /* Allocation policy */
int8_t QueuingPolicy; /* Temporary unused. */
@@ -334,6 +332,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
int parseResourceQueueAttributes( List *attributes,
DynResourceQueue queue,
+ bool checkformatonly,
char *errorbuf,
int errorbufsize);
@@ -367,8 +366,7 @@ void setQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack);
void removeQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack);
-DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid,
- bool *exist);
+DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid, bool *exist);
DynResourceQueueTrack getQueueTrackByQueueName(char *quename,
int quenamelen,
@@ -382,7 +380,7 @@ int parseUserAttributes( List *attributes,
int checkUserAttributes(UserInfo user, char *errorbuf, int errorbufsize);
-int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize);
+void createUser(UserInfo userinfo);
void setUserIndexedByUserOID(UserInfo userinfo);
void setUserIndexedByUserName(UserInfo userinfo);
@@ -420,7 +418,7 @@ const char *getUSRTBLAttributeName(int attrindex);
int registerConnectionByUserID(ConnectionTrack conntrack);
int acquireResourceFromResQueMgr(ConnectionTrack conntrack);
int returnResourceToResQueMgr(ConnectionTrack conntrack);
-void returnConnectionToQueue(ConnectionTrack conntrack, bool normally);
+void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout);
int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack);
void cancelResourceAllocRequest(ConnectionTrack conntrack);
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/rmcommon.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/rmcommon.h b/src/backend/resourcemanager/include/rmcommon.h
index 246454b..6150e89 100644
--- a/src/backend/resourcemanager/include/rmcommon.h
+++ b/src/backend/resourcemanager/include/rmcommon.h
@@ -1,15 +1,8 @@
#ifndef HAWQ_RESOURCEMANAGER_COMMON_DEFINITIONS
#define HAWQ_RESOURCEMANAGER_COMMON_DEFINITIONS
-#define RESOURCE_QUEUE_DEFAULT_QUEUE_NAME "pg_default"
-#define RESOURCE_QUEUE_ROOT_QUEUE_NAME "pg_root"
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_MEM "mem:"
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_CORE "core:"
#define RESOURCE_QUEUE_RATIO_SIZE 32
#define USER_DEFAULT_PRIORITY 3
-#define RESOURCE_QUEUE_PARALLEL_COUNT_DEF 100
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_DEF 128
-#define RESOURCE_QUEUE_RES_UPPER_FACTOR_DEF 2
/* The queue is a valid leaf queue. */
#define RESOURCE_QUEUE_STATUS_VALID_LEAF 0x00000001
@@ -30,6 +23,7 @@
#define RESOURCE_QUEUE_STATUS_IS_VER1X 0x20000000
#define RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX 64
+#define RESOURCE_QUEUE_TBL_COLNAME_LENGTH_MAX 64
#define RESOURCE_QUEUE_DDL_POLICY_LENGTH_MAX 64
#define RESOURCE_ROLE_DDL_ATTR_LENGTH_MAX 64
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index bc3ec27..de8131f 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -72,7 +72,7 @@ bool handleRMRequestConnectionReg(void **arg)
else
{
/* No connection id resource. Return occupation in resource queue. */
- returnConnectionToQueue(conntrack, true);
+ returnConnectionToQueue(conntrack, false);
elog(LOG, "Resource manager can not accept more connections.");
response.Result = res;
response.ConnID = INVALID_CONNID;
@@ -150,7 +150,7 @@ bool handleRMRequestConnectionRegByOID(void **arg)
}
else {
/* No connection id resource. Return occupation in resource queue. */
- returnConnectionToQueue(conntrack, true);
+ returnConnectionToQueue(conntrack, false);
elog(LOG, "Resource manager can not accept more connections.");
response.Result = res;
response.ConnID = INVALID_CONNID;
@@ -194,9 +194,11 @@ bool handleRMRequestConnectionUnReg(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d try to unregister.",
request->ConnID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
@@ -213,15 +215,16 @@ bool handleRMRequestConnectionUnReg(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d unregisters connection.",
request->ConnID);
- if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) ) {
- elog(RMLOG, "HAWQ RM :: Wrong connection status for unregistering. "
+ if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
+ {
+ elog(DEBUG5, "HAWQ RM :: Wrong connection status for unregistering. "
"Current connection status is %d.",
(*conntrack)->Progress);
res = REQUESTHANDLER_WRONG_CONNSTAT;
goto sendresponse;
}
- returnConnectionToQueue(*conntrack, true);
+ returnConnectionToQueue(*conntrack, false);
elog(DEBUG3, "One connection is unregistered. ConnID=%d", (*conntrack)->ConnID);
@@ -299,7 +302,8 @@ bool handleRMRequestAcquireResource(void **arg)
* request list.
*/
if ( PQUEMGR->RootTrack != NULL &&
- PQUEMGR->RootTrack->ClusterSegNumberMax == 0 ) {
+ PQUEMGR->RootTrack->ClusterSegNumberMax == 0 )
+ {
return false;
}
@@ -314,9 +318,11 @@ bool handleRMRequestAcquireResource(void **arg)
request->ConnID,
request->SessionID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
@@ -326,21 +332,23 @@ bool handleRMRequestAcquireResource(void **arg)
(*conntrack)->Progress);
}
- request = (RPCRequestHeadAcquireResourceFromRM)
- ((*conntrack)->MessageBuff.Buffer);
- if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE ) {
+ request = (RPCRequestHeadAcquireResourceFromRM)((*conntrack)->MessageBuff.Buffer);
+ if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: The connection track already has allocated "
"resource. Send again. ConnID=%d",
request->ConnID);
goto sendagain;
}
- else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT ) {
+ else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
+ {
elog(DEBUG5, "HAWQ RM :: The connection track already accepted "
"acquire resource request. Ignore. ConnID=%d",
request->ConnID);
goto sendignore;
}
- else if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE ) {
+ else if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
"Current connection status is %d.",
(*conntrack)->Progress);
@@ -397,7 +405,8 @@ sendresponse:
(*conntrack)->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE);
- if ( res == CONNTRACK_NO_CONNID ) {
+ if ( res == CONNTRACK_NO_CONNID )
+ {
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
}
@@ -440,10 +449,12 @@ bool handleRMRequestReturnResource(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
request->ConnID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
- elog(LOG, "Not valid resource context with id %d.", request->ConnID);
+ if ( res != FUNC_RETURN_OK )
+ {
+ elog(WARNING, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
elog(DEBUG5, "HAWQ RM :: Fetched existing connection track "
@@ -456,14 +467,16 @@ bool handleRMRequestReturnResource(void **arg)
request = (RPCRequestHeadReturnResource)
((*conntrack)->MessageBuff.Buffer);
elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
- request->ConnID);
+ request->ConnID);
- if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE ) {
+ if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: The resource has been returned or has not been "
"acquired.");
goto sendresponse;
}
- else if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE ) {
+ else if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
"Current connection status is %d.",
(*conntrack)->Progress);
@@ -478,6 +491,8 @@ bool handleRMRequestReturnResource(void **arg)
sendresponse:
{
+ elog(DEBUG3, "Return resource result %d.", res);
+
RPCResponseHeadReturnResourceData response;
response.Result = res;
response.Reserved = 0;
@@ -488,7 +503,8 @@ sendresponse:
(*conntrack)->MessageMark2,
RESPONSE_QD_RETURN_RESOURCE );
- if ( res == CONNTRACK_NO_CONNID ) {
+ if ( res == CONNTRACK_NO_CONNID )
+ {
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
}
@@ -797,14 +813,20 @@ bool handleRMRequestRefreshResource(void **arg)
uint32_t *connids = (uint32_t *)
(conntrack->MessageBuff.Buffer +
sizeof(RPCRequestHeadRefreshResourceHeartBeatData));
- for ( int i = 0 ; i < request->ConnIDCount ; ++i ) {
+
+ elog(DEBUG3, "Resource manager refreshes %d ConnIDs.", request->ConnIDCount);
+
+ for ( int i = 0 ; i < request->ConnIDCount ; ++i )
+ {
/* Find connection track identified by ConnID */
res = getInUseConnectionTrack(connids[i], &oldct);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
oldct->LastActTime = curmsec;
- elog(DEBUG5, "Refreshed resource of connection id %d", connids[i]);
+ elog(DEBUG3, "Refreshed resource of connection id %d", connids[i]);
}
- else {
+ else
+ {
elog(DEBUG3, "Can not find connection id %d for resource refreshing.",
connids[i]);
}