You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by yj...@apache.org on 2015/10/16 03:08:35 UTC
[1/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 7e6838f84 -> a413a4265
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/pg_resqueue.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_resqueue.h b/src/include/catalog/pg_resqueue.h
index 41961cb..38bb6a6 100644
--- a/src/include/catalog/pg_resqueue.h
+++ b/src/include/catalog/pg_resqueue.h
@@ -28,17 +28,20 @@
with (camelcase=ResQueue, shared=true, relid=6026, reltype_oid=9830, toast_oid=9820, toast_index=9821, toast_reltype=9822)
(
rsqname name, -- name of resource queue
- rsq_parent oid, -- oid of resource queue
- rsq_active_stats_cluster integer, -- active statement count limit
- rsq_memory_limit_cluster text, -- memory limit in cluster
- rsq_core_limit_cluster text, -- core limit in cluster
- rsq_resource_upper_factor real, -- resource upper limit in cluster
- rsq_allocation_policy text, -- query resource allocation policy
- rsq_vseg_resource_quota text, -- vsegment resource quota
- rsq_vseg_upper_limit integer, -- vsegment size upper limit
- rsq_creation_time timestamp with time zone, -- when the queue is created
- rsq_update_time timestamp with time zone, -- when the queue is updated ( create or alter )
- rsq_status text, -- the status of resource queue.
+ parentoid oid, -- oid of resource queue
+ activestats integer, -- active statement count limit
+ memorylimit text, -- memory limit in cluster
+ corelimit text, -- core limit in cluster
+ resovercommit real, -- resource upper limit in cluster
+ allocpolicy text, -- query resource allocation policy
+ vsegresourcequota text, -- vsegment resource quota
+ nvsegupperlimit integer, -- vsegment size upper limit
+ nvseglowerlimit integer, -- vsegment size lower limit
+ nvsegupperlimitperseg real, -- vsegment size upper limit per segment
+ nvseglowerlimitperseg real, -- vsegment size lower limit per segment
+ creationtime timestamp with time zone, -- when the queue is created
+ updatetime timestamp with time zone, -- when the queue is updated ( create or alter )
+ status text, -- the status of resource queue
);
create unique index on pg_resqueue(oid) with (indexid=6027);
@@ -51,7 +54,7 @@
WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
Generated by ./tidycat.pl version 34
- on Thu Aug 7 17:27:40 2014
+ on Thu Sep 17 15:51:40 2015
*/
@@ -66,7 +69,7 @@
*/
/*
- * The CATALOG definition has to refer to the type of "rsq_creation_time" et al as
+ * The CATALOG definition has to refer to the type of "creationtime" et al as
* "timestamptz" (lower case) so that bootstrap mode recognizes it. But
* the C header files define this type as TimestampTz. Since the field is
* potentially-null and therefore cannot be accessed directly from C code,
@@ -85,18 +88,21 @@
CATALOG(pg_resqueue,6026) BKI_SHARED_RELATION
{
- NameData rsqname; /* name of resource queue */
- Oid rsq_parent; /* oid of resource queue */
- int4 rsq_active_stats_cluster; /* active statement count limit in cluster */
- text rsq_memory_limit_cluster; /* memory limit in cluster */
- text rsq_core_limit_cluster; /* core limit in cluster */
- float4 rsq_resource_upper_factor; /* resource upper limit in cluster */
- text rsq_allocation_policy; /* query resource allocation policy */
- text rsq_vseg_resource_quota; /* vsegment resource quota */
- int4 rsq_vseg_upper_limit; /* vsegment size upper limit */
- timestamptz rsq_creation_time; /* when the queue is created */
- timestamptz rsq_update_time; /* when the queue is updated ( create or alter ) */
- text rsq_status; /* the status of resource queue. */
+ NameData rsqname; /* name of resource queue */
+ Oid parentoid; /* oid of resource queue */
+ int4 activestats; /* active statement count limit */
+ text memorylimit; /* memory limit in cluster */
+ text corelimit; /* core limit in cluster */
+ float4 resovercommit; /* resource upper limit in cluster */
+ text allocpolicy; /* query resource allocation policy */
+ text vsegresourcequota; /* vsegment resource quota */
+ int4 nvsegupperlimit; /* vsegment size upper limit */
+ int4 nvseglowerlimit; /* vsegment size lower limit */
+ float4 nvsegupperlimitperseg; /* vsegment size upper limit per segment */
+ float4 nvseglowerlimitperseg; /* vsegment size lower limit per segment */
+ timestamptz creationtime; /* when the queue is created */
+ timestamptz updatetime; /* when the queue is updated ( create or alter ) */
+ text status; /* the status of resource queue */
} FormData_pg_resqueue;
#undef timestamptz
@@ -114,268 +120,95 @@ typedef FormData_pg_resqueue *Form_pg_resqueue;
* compiler constants for pg_resqueue
* ----------------
*/
-#define Natts_pg_resqueue 12
-#define Anum_pg_resqueue_rsqname 1
-#define Anum_pg_resqueue_rsq_parent 2
-#define Anum_pg_resqueue_rsq_active_stats_cluster 3
-#define Anum_pg_resqueue_rsq_memory_limit_cluster 4
-#define Anum_pg_resqueue_rsq_core_limit_cluster 5
-#define Anum_pg_resqueue_rsq_resource_upper_factor 6
-#define Anum_pg_resqueue_rsq_allocation_policy 7
-#define Anum_pg_resqueue_rsq_vseg_resource_quota 8
-#define Anum_pg_resqueue_rsq_vseg_upper_limit 9
-#define Anum_pg_resqueue_rsq_creation_time 10
-#define Anum_pg_resqueue_rsq_update_time 11
-#define Anum_pg_resqueue_rsq_status 12
-
-/* TIDYCAT_END_CODEGEN */
-
-
-/* Create initial default resource queue */
-DATA(insert OID = 9800 ( "pg_root" 0 "-1" "100%" "100%" 2 "even" _null_ _null_ _null_ _null_ "branch"));
-DATA(insert OID = 6055 ( "pg_default" 9800 "100" "50%" "50%" 2 "even" "mem:128mb" -1 _null_ _null_ _null_));
-
-#define DEFAULT_RESQUEUE_ACTIVESTATS "100"
-#define DEFAULT_RESQUEUE_UPPERFACTOR "2"
-#define DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT "-1"
-#define DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT_N -1
-
-#define MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT "1"
-#define MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT_N 1.0
-
-#define DEFAULT_RESQUEUE_POLICY "even"
-#define DEFAULT_RESQUEUE_SEG_QUOTA "mem:128mb"
-
-#define PG_RESQUEUE_COL_OID "oid"
-#define PG_RESQUEUE_COL_RSQNAME "rsqname"
-#define PG_RESQUEUE_COL_PARENT "rsq_parent"
-#define PG_RESQUEUE_COL_ACTIVE_STATS_CLUSTER "rsq_active_stats_cluster"
-#define PG_RESQUEUE_COL_MEMORY_LIMIT_CLUSTER "rsq_memory_limit_cluster"
-#define PG_RESQUEUE_COL_CORE_LIMIT_CLUSTER "rsq_core_limit_cluster"
-#define PG_RESQUEUE_COL_RESOURCE_UPPER_FACTOR "rsq_resource_upper_factor"
-#define PG_RESQUEUE_COL_ALLOCATION_POLICY "rsq_allocation_policy"
-#define PG_RESQUEUE_COL_VSEG_RESOURCE_QUOTA "rsq_vseg_resource_quota"
-#define PG_RESQUEUE_COL_VSEG_UPPER_LIMIT "rsq_vseg_upper_limit"
-#define PG_RESQUEUE_COL_CREATION_TIME "rsq_creation_time"
-#define PG_RESQUEUE_COL_UPDATE_TIME "rsq_update_time"
-#define PG_RESQUEUE_COL_STATUS "rsq_status"
-
-#define ROOTRESQUEUE_OID 9800
-#define DEFAULTRESQUEUE_OID 6055
-
-/*
- The flavors of resource types:
-
- required: user must specify this type during CREATE. Every queue
- must always have this type entry in pg_resqueuecapability. It is
- not required to have a default value. It may or may not have an
- offvalue, depending on the "has disable" setting.
-
- optional (ie not required): user does not have to specify this type.
- If hasdefault is false, then no entry is required for
- pg_resqueuecapability. If hasdefault is true, then CREATE will add
- the default entry to pg_resqueuecapability during CREATE.
-
- has disable: whether the resource type has an OFF switch, ie what is
- the WITHOUT behavior. For a required type, if it can be disabled,
- then it must have an off value. For an optional type, if it can be
- disabled, there are two options:
-
- 1) if the optional type has a default value, then if must have an
- off value. The off value can be the same as the default value.
-
- 2) if the optional type does not have a default value, then the
- assumption is that it gets "shut off" by removing the
- pg_resqueuecapability entry. The off value is ignored. Which
- means if hasdefault is false, and required is false, then
- hasdisable must be true (because the CREATE statement won't add
- an entry for the type, so it is de facto disabled).
-
- */
-
-/* MPP-6923: Resource Queue attribute flexibility */
-
-/* TIDYCAT_BEGINDEF
-
- CREATE TABLE pg_resourcetype
- with (camelcase=ResourceType, shared=true, relid=6059, reltype_oid=6445)
- (
- resname name, -- name of resource type
- restypid smallint, -- resource type id
- resrequired boolean, -- if required, user must specify during CREATE
- reshasdefault boolean, -- create a default entry for optional type
- reshasdisable boolean, -- whether the type can be removed or shut off
- resdefaultsetting text, -- default resource setting
- resdisabledsetting text -- value that turns it off
-
- );
-
- create unique index on pg_resourcetype(oid) with (indexid=6061);
- create unique index on pg_resourcetype(restypid) with (indexid=6062);
- create unique index on pg_resourcetype(resname) with (indexid=6063);
-
- TIDYCAT_ENDDEF
-*/
-/* TIDYCAT_BEGIN_CODEGEN
-
- WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
- Generated by ./tidycat.pl version 34
- on Thu Aug 7 17:27:40 2014
-*/
-
-
-/*
- TidyCat Comments for pg_resourcetype:
- Table is shared, so catalog.c:IsSharedRelation is updated.
- Table has an Oid column.
- Table has static type (see pg_types.h).
- Table has TOASTable columns, but NO TOAST table.
-
-*/
-
-/* ----------------
- * pg_resourcetype definition. cpp turns this into
- * typedef struct FormData_pg_resourcetype
- * ----------------
- */
-#define ResourceTypeRelationId 6059
-
-CATALOG(pg_resourcetype,6059) BKI_SHARED_RELATION
-{
- NameData resname; /* name of resource type */
- int2 restypid; /* resource type id */
- bool resrequired; /* if required, user must specify during CREATE */
- bool reshasdefault; /* create a default entry for optional type */
- bool reshasdisable; /* whether the type can be removed or shut off */
- text resdefaultsetting; /* default resource setting */
- text resdisabledsetting; /* value that turns it off */
-} FormData_pg_resourcetype;
-
-
-/* ----------------
- * Form_pg_resourcetype corresponds to a pointer to a tuple with
- * the format of pg_resourcetype relation.
- * ----------------
- */
-typedef FormData_pg_resourcetype *Form_pg_resourcetype;
-
-
-/* ----------------
- * compiler constants for pg_resourcetype
- * ----------------
- */
-#define Natts_pg_resourcetype 7
-#define Anum_pg_resourcetype_resname 1
-#define Anum_pg_resourcetype_restypid 2
-#define Anum_pg_resourcetype_resrequired 3
-#define Anum_pg_resourcetype_reshasdefault 4
-#define Anum_pg_resourcetype_reshasdisable 5
-#define Anum_pg_resourcetype_resdefaultsetting 6
-#define Anum_pg_resourcetype_resdisabledsetting 7
+#define Natts_pg_resqueue 15
+#define Anum_pg_resqueue_rsqname 1
+#define Anum_pg_resqueue_parentoid 2
+#define Anum_pg_resqueue_activestats 3
+#define Anum_pg_resqueue_memorylimit 4
+#define Anum_pg_resqueue_corelimit 5
+#define Anum_pg_resqueue_resovercommit 6
+#define Anum_pg_resqueue_allocpolicy 7
+#define Anum_pg_resqueue_vsegresourcequota 8
+#define Anum_pg_resqueue_nvsegupperlimit 9
+#define Anum_pg_resqueue_nvseglowerlimit 10
+#define Anum_pg_resqueue_nvsegupperlimitperseg 11
+#define Anum_pg_resqueue_nvseglowerlimitperseg 12
+#define Anum_pg_resqueue_creationtime 13
+#define Anum_pg_resqueue_updatetime 14
+#define Anum_pg_resqueue_status 15
/* TIDYCAT_END_CODEGEN */
-/* Create entry in pg_resourcetype for PRIORITY */
-DATA(insert OID = 6454 ( active_statements 1 f t f -1 -1 ));
-DATA(insert OID = 6455 ( parent 2 t f f -1 -1 ));
-DATA(insert OID = 6456 ( memory_limit_cluster 3 t f f -1 -1));
-DATA(insert OID = 6457 ( core_limit_cluster 4 t f f -1 -1 ));
-DATA(insert OID = 6458 ( vsegment_resource_quota 5 f t f -1 -1 ));
-DATA(insert OID = 6459 ( resource_upper_factor 6 f t f -1 -1 ));
-/*
- The first four entries of pg_resourcetype are special mappings for
- the original pg_resqueue columns. The following table shows the
- correspondence between the original grammar, the pg_resqueue column
- name, and the WITH clause defnames.
-
- grammar colname orig_defname new_defname
- ------------------ ----------------- --------------- -----------------
- "ACTIVE THRESHOLD" rsqcountlimit activelimit active_statements
- "COST THRESHOLD" rsqcostlimit costlimit max_cost
- "IGNORE THRESHOLD" rsqignorecostlimit ignorecostlimit min_cost
- "OVERCOMMIT" rsqovercommit overcommit cost_overcommit
-*/
-
-/* Note: the restypid is used by pg_dumpall.c to build CREATE statements */
-#define PG_RESRCTYPE_ACTIVE_STATEMENTS 1 /* rsqcountlimit: count */
-#define PG_RESRCTYPE_MAX_COST 2 /* rsqcostlimit: max_cost */
-#define PG_RESRCTYPE_MIN_COST 3 /* rsqignorecostlimit: min_cost */
-#define PG_RESRCTYPE_COST_OVERCOMMIT 4 /* rsqovercommit: cost_overcommit*/
-/* start of "pg_resourcetype" entries... */
-#define PG_RESRCTYPE_PRIORITY 5 /* backoff.c: priority queue */
-#define PG_RESRCTYPE_MEMORY_LIMIT 6 /* memquota.c: memory quota */
-
-/* TIDYCAT_BEGINDEF
-
- CREATE TABLE pg_resqueuecapability
- with (camelcase=ResQueueCapability, shared=true, relid=6060, reltype_oid=6446)
- (
- resqueueid oid, -- OID of the queue with this capability
- restypid smallint, -- resource type id (key to pg_resourcetype)
- ressetting text -- resource setting (opaque type)
- );
-
- create unique index on pg_resqueuecapability(oid) with (indexid=6064);
- create index on pg_resqueuecapability(resqueueid) with (indexid=6065);
- create index on pg_resqueuecapability(restypid) with (indexid=6066);
-
- alter table pg_resqueuecapability add fk resqueueid on pg_resqueue(oid);
- alter table pg_resqueuecapability add fk restypid on pg_resourcetype(restypid);
-
- TIDYCAT_ENDDEF
-*/
-/* TIDYCAT_BEGIN_CODEGEN
-
- WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
- Generated by ./tidycat.pl version 34
- on Thu Aug 7 17:27:40 2014
-*/
+/* Create initial default resource queue */
+DATA(insert OID = 9800 ( "pg_root" 0 "-1" "100%" "100%" 2 "even" _null_ 0 0 0 1 _null_ _null_ "branch"));
+DATA(insert OID = 6055 ( "pg_default" 9800 "100" "50%" "50%" 2 "even" "mem:128mb" 0 0 0 1 _null_ _null_ _null_));
/*
- TidyCat Comments for pg_resqueuecapability:
- Table is shared, so catalog.c:IsSharedRelation is updated.
- Table has an Oid column.
- Table has static type (see pg_types.h).
- Table has TOASTable columns, but NO TOAST table.
-
-*/
-
-/* ----------------
- * pg_resqueuecapability definition. cpp turns this into
- * typedef struct FormData_pg_resqueuecapability
- * ----------------
- */
-#define ResQueueCapabilityRelationId 6060
-
-CATALOG(pg_resqueuecapability,6060) BKI_SHARED_RELATION
-{
- Oid resqueueid; /* OID of the queue with this capability */
- int2 restypid; /* resource type id (key to pg_resourcetype) */
- text ressetting; /* resource setting (opaque type) */
-} FormData_pg_resqueuecapability;
-
-
-/* ----------------
- * Form_pg_resqueuecapability corresponds to a pointer to a tuple with
- * the format of pg_resqueuecapability relation.
- * ----------------
- */
-typedef FormData_pg_resqueuecapability *Form_pg_resqueuecapability;
-
-
-/* ----------------
- * compiler constants for pg_resqueuecapability
- * ----------------
+ * The possible resource allocation policies.
*/
-#define Natts_pg_resqueuecapability 3
-#define Anum_pg_resqueuecapability_resqueueid 1
-#define Anum_pg_resqueuecapability_restypid 2
-#define Anum_pg_resqueuecapability_ressetting 3
-
+enum RESOURCE_QUEUE_ALLOCATION_POLICY_INDEX {
+ RSQ_ALLOCATION_POLICY_EVEN = 0,
+
+ RSQ_ALLOCATION_POLICY_COUNT
+};
+
+#define DEFAULT_RESQUEUE_ACTIVESTATS "100"
+#define DEFAULT_RESQUEUE_OVERCOMMIT "2"
+#define DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT "0"
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT "0"
+#define DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT "0"
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT "1"
+
+#define DEFAULT_RESQUEUE_ALLOCPOLICY "even"
+#define DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA "mem:128mb"
+
+#define DEFAULT_RESQUEUE_ACTIVESTATS_N 100
+#define DEFAULT_RESQUEUE_OVERCOMMIT_N 2.0
+#define DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT_N 0
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT_N 0
+#define DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N 0.0
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N 1.0
+
+#define DEFAULT_RESQUEUE_ALLOCPOLICY_N RSQ_ALLOCATION_POLICY_EVEN
+#define DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA_N 128
+
+#define MINIMUM_RESQUEUE_OVERCOMMIT "1"
+#define MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT "0"
+#define MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT "0"
+#define MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT "0"
+#define MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT "0"
+
+#define MINIMUM_RESQUEUE_OVERCOMMIT_N 1.0
+#define MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N 0
+#define MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N 0
+#define MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N 0.0
+#define MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N 0.0
+
+#define PG_RESQUEUE_COL_OID "oid"
+#define PG_RESQUEUE_COL_RSQNAME "rsqname"
+#define PG_RESQUEUE_COL_PARENTOID "parentoid"
+#define PG_RESQUEUE_COL_ACTIVESTATS "activestats"
+#define PG_RESQUEUE_COL_MEMORYLIMIT "memorylimit"
+#define PG_RESQUEUE_COL_CORELIMIT "corelimit"
+#define PG_RESQUEUE_COL_RESOVERCOMMIT "resovercommit"
+#define PG_RESQUEUE_COL_ALLOCPOLICY "allocpolicy"
+#define PG_RESQUEUE_COL_VSEGRESOURCEQUOTA "vsegresourcequota"
+#define PG_RESQUEUE_COL_NVSEGUPPERLIMIT "nvsegupperlimit"
+#define PG_RESQUEUE_COL_NVSEGLOWERLIMIT "nvsegupperlimit"
+#define PG_RESQUEUE_COL_NVSEGUPPERLIMITPERSEG "nvsegupperlimitperseg"
+#define PG_RESQUEUE_COL_NVSEGLOWERLIMITPERSEG "nvseglowerlimitperseg"
+#define PG_RESQUEUE_COL_CREATIONTIME "creationtime"
+#define PG_RESQUEUE_COL_UPDATETIME "updatetime"
+#define PG_RESQUEUE_COL_STATUS "status"
-/* TIDYCAT_END_CODEGEN */
+#define ROOTRESQUEUE_OID 9800
+#define DEFAULTRESQUEUE_OID 6055
+#define RESOURCE_QUEUE_DEFAULT_QUEUE_NAME "pg_default"
+#define RESOURCE_QUEUE_ROOT_QUEUE_NAME "pg_root"
+#define RESOURCE_QUEUE_SEG_RES_QUOTA_MEM "mem:"
+#define RESOURCE_QUEUE_SEG_RES_QUOTA_CORE "core:" /* Reserved. */
#endif /* PG_RESQUEUE_H */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/pg_type.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h
index 591c991..c356f14 100644
--- a/src/include/catalog/pg_type.h
+++ b/src/include/catalog/pg_type.h
@@ -793,22 +793,14 @@ DATA(insert OID = 7077 ( pg_remote_credentials PGNSP PGUID -1 f c t \054 707
/* relation id: 7076 - pg_remote_credentials 20140205 */
DATA(insert OID = 7080 (pg_toast_7076 TOASTNSP PGUID -1 f c t \054 7078 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_));
#define PG_REMOTE_CREDENTIALS_TOAST_RELTYPE_OID 7080
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20150917 */
DATA(insert OID = 9830 ( pg_resqueue PGNSP PGUID -1 f c t \054 6026 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_ ));
#define PG_RESQUEUE_RELTYPE_OID 9830
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20150917 */
DATA(insert OID = 9822 (pg_toast_6026 TOASTNSP PGUID -1 f c t \054 9820 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_));
#define PG_RESQUEUE_TOAST_RELTYPE_OID 9822
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DATA(insert OID = 6445 ( pg_resourcetype PGNSP PGUID -1 f c t \054 6059 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_ ));
-#define PG_RESOURCETYPE_RELTYPE_OID 6445
-
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DATA(insert OID = 6446 ( pg_resqueuecapability PGNSP PGUID -1 f c t \054 6060 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_ ));
-#define PG_RESQUEUECAPABILITY_RELTYPE_OID 6446
-
/* TIDYCAT_END_CODEGEN */
DATA(insert OID = 6989 (gp_persistent_relfile_node PGNSP PGUID -1 f c t \054 5089 0 record_in record_out record_recv record_send - d x f 0 -1 0 _null_ _null_));
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/tidycat.pl
----------------------------------------------------------------------
diff --git a/src/include/catalog/tidycat.pl b/src/include/catalog/tidycat.pl
index cd9deca..d613609 100755
--- a/src/include/catalog/tidycat.pl
+++ b/src/include/catalog/tidycat.pl
@@ -447,7 +447,6 @@ my %dynamic_reltype_h =
5011 => "pg_partition_rule",
1136 => "pg_pltemplate",
1255 => "pg_proc", # special bootstrap case
- #6026 => "pg_resqueue",
2618 => "pg_rewrite",
1214 => "pg_shdepend",
2396 => "pg_shdescription",
@@ -490,8 +489,6 @@ my %toast_tab_exception_h =
"pg_index" => 1,
"pg_partition_rule" => 1,
"pg_pltemplate" => 1,
- "pg_resqueuecapability" => 1,
- "pg_resourcetype" => 1,
"pg_stat_last_operation" => 1,
"pg_stat_last_shoperation" => 1,
"pg_tablespace" => 1,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/toasting.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/toasting.h b/src/include/catalog/toasting.h
index 0806928..401defb 100644
--- a/src/include/catalog/toasting.h
+++ b/src/include/catalog/toasting.h
@@ -101,7 +101,7 @@ DECLARE_TOAST(pg_remote_credentials, 7078, 7079);
#define PgRemoteCredentialsToastTable 7078
#define PgRemoteCredentialsToastIndex 7079
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20150917 */
DECLARE_TOAST(pg_resqueue, 9820, 9821);
#define PgResQueueToastTable 9820
#define PgResQueueToastIndex 9821
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index dcc7bdc..64d8b74 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1155,13 +1155,12 @@ extern char *dfs_url;
extern char *master_directory;
extern char *seg_directory;
-extern bool rm_domain_comm_enable;
/* HAWQ 2.0 resource manager GUCs */
-extern int rm_master_addr_domain_port;
-extern int rm_master_addr_port;
-extern int rm_seg_addr_port;
+extern int rm_master_domain_port;
+extern int rm_master_port;
+extern int rm_segment_port;
-extern char *rm_grm_server_type;
+extern char *rm_global_rm_type;
extern char *rm_seg_memory_use;
extern double rm_seg_core_use;
@@ -1172,14 +1171,19 @@ extern char *rm_grm_yarn_queue;
extern char *rm_grm_yarn_app_name;
extern int rm_grm_breath_return_percentage;
-extern int rm_query_vseg_num_limit;
-extern int rm_query_vseg_num_per_seg_limit;
-extern int rm_slice_num_per_seg_limit;
+extern int rm_nvseg_perquery_limit;
+extern int rm_nvseg_perquery_perseg_limit;
+extern int rm_nslice_perseg_limit;
extern int rm_seg_container_default_waterlevel;
-extern int rm_resource_noaction_timeout;
+
+extern int rm_session_lease_timeout;
+extern bool rm_session_lease_heartbeat_enable;
+
extern int rm_query_resource_noresource_timeout;
extern int rm_resource_timeout;
extern int rm_resource_heartbeat_interval;
+extern int rm_tolerate_nseg_limit;
+extern int rm_nvseg_variance_among_seg_limit;
extern char *rm_resourcepool_test_filename;
extern bool rm_force_fifo_queue;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/test/regress/data/upgrade20/upg2_catupgrade_20.sql.in
----------------------------------------------------------------------
diff --git a/src/test/regress/data/upgrade20/upg2_catupgrade_20.sql.in b/src/test/regress/data/upgrade20/upg2_catupgrade_20.sql.in
index bd8114d..74d5ffe 100644
--- a/src/test/regress/data/upgrade20/upg2_catupgrade_20.sql.in
+++ b/src/test/regress/data/upgrade20/upg2_catupgrade_20.sql.in
@@ -106,7 +106,7 @@ UPDATE pg_resourcetype SET resname='active_statements', resrequired=false, resha
UPDATE pg_resourcetype SET resname='parent', resrequired=true, reshasdefault=false, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=2;
UPDATE pg_resourcetype SET resname='memory_limit_cluster', resrequired=true, reshasdefault=false, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=3;
UPDATE pg_resourcetype SET resname='core_limit_cluster', resrequired=true, reshasdefault=false, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=4;
-UPDATE pg_resourcetype SET resname='segment_resource_quota', resrequired=false, reshasdefault=true, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=5;
+UPDATE pg_resourcetype SET resname='vseg_resource_quota', resrequired=false, reshasdefault=true, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=5;
UPDATE pg_resourcetype SET resname='resource_upper_factor', resrequired=false, reshasdefault=true, reshasdisable=false, resdefaultsetting=-1, resdisabledsetting=-1 WHERE restypid=6;
-- change proc pg_stat_get_activity
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/test/regress/expected/parquet_compression.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/parquet_compression.out b/src/test/regress/expected/parquet_compression.out
index dd5370f..bcb7cd8 100644
--- a/src/test/regress/expected/parquet_compression.out
+++ b/src/test/regress/expected/parquet_compression.out
@@ -26,7 +26,7 @@ drop table parquet_snappy_part_unc;
ERROR: table "parquet_snappy_part_unc" does not exist
drop table parquet_gzip_2;
ERROR: table "parquet_gzip_2" does not exist
-alter resource queue pg_default with ( segment_resource_quota='mem:2gb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:2gb');
--set statement_mem='1999MB';
--end_ignore
--Datatypes covered: text,bytea,varchar,bit varying
@@ -145,4 +145,4 @@ Select count(*) from parquet_gzip_2;
1
(1 row)
-alter resource queue pg_default with ( vsegment_resource_quota='mem:128mb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:128mb');
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/test/regress/sql/parquet_compression.sql
----------------------------------------------------------------------
diff --git a/src/test/regress/sql/parquet_compression.sql b/src/test/regress/sql/parquet_compression.sql
index 48170cc..92a6f5f 100644
--- a/src/test/regress/sql/parquet_compression.sql
+++ b/src/test/regress/sql/parquet_compression.sql
@@ -19,7 +19,7 @@ drop table parquet_snappy_part;
drop table parquet_snappy_part_unc;
drop table parquet_gzip_2;
-alter resource queue pg_default with ( vsegment_resource_quota='mem:2gb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:2gb');
--set statement_mem='1999MB';
--end_ignore
@@ -122,4 +122,4 @@ insert into parquet_gzip_2 values(12,array_to_string(ARRAY(SELECT chr((65 + roun
Select count(*) from parquet_gzip_2;
-alter resource queue pg_default with ( vsegment_resource_quota='mem:128mb');
+alter resource queue pg_default with ( vseg_resource_quota='mem:128mb');
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/tools/bin/generate-greenplum-path.sh
----------------------------------------------------------------------
diff --git a/tools/bin/generate-greenplum-path.sh b/tools/bin/generate-greenplum-path.sh
index e5fff54..6bf5eae 100755
--- a/tools/bin/generate-greenplum-path.sh
+++ b/tools/bin/generate-greenplum-path.sh
@@ -88,9 +88,13 @@ cat << EOF
LIBHDFS3_CONF=\$GPHOME/etc/hdfs-client.xml
EOF
+# libyarn configuration file path
+cat << EOF
+LIBYARN_CONF=\$GPHOME/etc/yarn-client.xml
+EOF
+
# global resource manager configuration file path
cat << EOF
-RESOURCEMANAGER_CONF=\$GPHOME/etc/resourcemanager.xml
HAWQSITE_CONF=\$GPHOME/etc/hawq-site.xml
EOF
@@ -122,6 +126,9 @@ export LIBHDFS3_CONF
EOF
cat <<EOF
-export RESOURCEMANAGER_CONF
+export LIBYARN_CONF
+EOF
+
+cat <<EOF
export HAWQSITE_CONF
EOF
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/tools/bin/gppylib/data/2.0.json
----------------------------------------------------------------------
diff --git a/tools/bin/gppylib/data/2.0.json b/tools/bin/gppylib/data/2.0.json
index c45ebb7..bd69df2 100644
--- a/tools/bin/gppylib/data/2.0.json
+++ b/tools/bin/gppylib/data/2.0.json
@@ -1,7 +1,7 @@
{
- "__comment" : "Generated by tidycat.pl version 34 on Wed Jul 22 16:26:10 2015 CATALOG_VERSION_NO=201504151",
+ "__comment" : "Generated by ./tidycat.pl version 34 on Thu Sep 17 15:49:18 2015 CATALOG_VERSION_NO=201507221",
"__info" : {
- "CATALOG_VERSION_NO" : "201504151"
+ "CATALOG_VERSION_NO" : "201507221"
},
"gp_configuration" : {
"CamelCaseRelationId" : "GpConfigurationRelationId",
@@ -8392,121 +8392,6 @@
"toast_reltype" : "7080"
}
},
- "pg_resourcetype" : {
- "CamelCaseRelationId" : "ResourceTypeRelationId",
- "UppercaseReltypeOid" : "PG_RESOURCETYPE_RELTYPE_OID",
- "colh" : {
- "oid" : "Oid",
- "resdefaultsetting" : "text",
- "resdisabledsetting" : "text",
- "reshasdefault" : "bool",
- "reshasdisable" : "bool",
- "resname" : "NameData",
- "resrequired" : "bool",
- "restypid" : "int2"
- },
- "cols" : [
- {
- "colname" : "resname",
- "ctype" : "NameData",
- "postcomment" : "-- name of resource type ",
- "precomment" : "\n**TK_BLANK_LINE**",
- "sqltype" : "name"
- },
- {
- "colname" : "restypid",
- "ctype" : "int2",
- "postcomment" : "-- resource type id ",
- "sqltype" : "smallint"
- },
- {
- "colname" : "resrequired",
- "ctype" : "bool",
- "postcomment" : "-- if required, user must specify during CREATE",
- "sqltype" : "boolean"
- },
- {
- "colname" : "reshasdefault",
- "ctype" : "bool",
- "postcomment" : "-- create a default entry for optional type",
- "sqltype" : "boolean"
- },
- {
- "colname" : "reshasdisable",
- "ctype" : "bool",
- "postcomment" : "-- whether the type can be removed or shut off",
- "sqltype" : "boolean"
- },
- {
- "colname" : "resdefaultsetting",
- "ctype" : "text",
- "postcomment" : "-- default resource setting ",
- "sqltype" : "text"
- },
- {
- "colname" : "resdisabledsetting",
- "ctype" : "text",
- "postcomment" : "-- value that turns it off ",
- "sqltype" : "text"
- }
- ],
- "filename" : "pg_resqueue.h",
- "indexes" : [
- {
- "CamelCaseIndexId" : "ResourceTypeOidIndexId",
- "cols" : [
- [
- "oid",
- "oid_ops"
- ]
- ],
- "indexid" : "6061",
- "unique" : "1",
- "with" : {
- "indexid" : "6061"
- }
- },
- {
- "CamelCaseIndexId" : "ResourceTypeRestypidIndexId",
- "cols" : [
- [
- "restypid",
- "int2_ops"
- ]
- ],
- "indexid" : "6062",
- "unique" : "1",
- "with" : {
- "indexid" : "6062"
- }
- },
- {
- "CamelCaseIndexId" : "ResourceTypeResnameIndexId",
- "cols" : [
- [
- "resname",
- "name_ops"
- ]
- ],
- "indexid" : "6063",
- "unique" : "1",
- "with" : {
- "indexid" : "6063"
- }
- }
- ],
- "relid_comment_tag" : "/* relation id: 6059 - pg_resourcetype */\n",
- "tabdef_text" : "\n CREATE TABLE pg_resourcetype\n with (camelcase=ResourceType, shared=true, relid=6059, reltype_oid=6445)\n (\n resname name, -- name of resource type \n restypid smallint, -- resource type id \n resrequired boolean, -- if required, user must specify during CREATE\n reshasdefault boolean, -- create a default entry for optional type\n reshasdisable boolean, -- whether the type can be removed or shut off\n resdefaultsetting text,\t-- default resource setting \n resdisabledsetting text\t-- value that turns it off \n\t\n )",
- "with" : {
- "bootstrap" : 0,
- "camelcase" : "ResourceType",
- "oid" : 1,
- "relid" : "6059",
- "reltype_oid" : "6445",
- "shared" : "1",
- "text" : "with (camelcase=ResourceType, shared=true, relid=6059, reltype_oid=6445)"
- }
- },
"pg_resqueue" : {
"CamelCaseRelationId" : "ResQueueRelationId",
"CamelCaseToastInd" : "PgResQueueToastIndex",
@@ -8514,18 +8399,22 @@
"UppercaseReltypeOid" : "PG_RESQUEUE_RELTYPE_OID",
"UppercaseToastReltypeOid" : "PG_RESQUEUE_TOAST_RELTYPE_OID",
"colh" : {
+ "activestats" : "int4",
+ "allocpolicy" : "text",
+ "corelimit" : "text",
+ "creationtime" : "timestamptz",
+ "memorylimit" : "text",
+ "nvseglowerlimit" : "int4",
+ "nvseglowerlimitperseg" : "int4",
+ "nvsegupperlimit" : "int4",
+ "nvsegupperlimitperseg" : "int4",
"oid" : "Oid",
- "rsq_active_stats_cluster" : "int4",
- "rsq_allocation_policy" : "text",
- "rsq_core_limit_cluster" : "text",
- "rsq_creation_time" : "timestamptz",
- "rsq_memory_limit_cluster" : "text",
- "rsq_parent" : "Oid",
- "rsq_resource_upper_factor" : "float4",
- "rsq_seg_resource_quota" : "text",
- "rsq_status" : "text",
- "rsq_update_time" : "timestamptz",
- "rsqname" : "NameData"
+ "parentoid" : "Oid",
+ "resovercommit" : "float4",
+ "rsqname" : "NameData",
+ "status" : "text",
+ "updatetime" : "timestamptz",
+ "vsegresourcequota" : "text"
},
"cols" : [
{
@@ -8536,63 +8425,87 @@
"sqltype" : "name"
},
{
- "colname" : "rsq_parent",
+ "colname" : "parentoid",
"ctype" : "Oid",
"postcomment" : "-- oid of resource queue",
"sqltype" : "oid"
},
{
- "colname" : "rsq_active_stats_cluster",
+ "colname" : "activestats",
"ctype" : "int4",
"postcomment" : "-- active statement count limit",
"sqltype" : "integer"
},
{
- "colname" : "rsq_memory_limit_cluster",
+ "colname" : "memorylimit",
"ctype" : "text",
"postcomment" : "-- memory limit in cluster",
"sqltype" : "text"
},
{
- "colname" : "rsq_core_limit_cluster",
+ "colname" : "corelimit",
"ctype" : "text",
"postcomment" : "-- core limit in cluster",
"sqltype" : "text"
},
{
- "colname" : "rsq_resource_upper_factor",
+ "colname" : "resovercommit",
"ctype" : "float4",
"postcomment" : "-- resource upper limit in cluster",
"sqltype" : "real"
},
{
- "colname" : "rsq_allocation_policy",
+ "colname" : "allocpolicy",
"ctype" : "text",
"postcomment" : "-- query resource allocation policy",
"sqltype" : "text"
},
{
- "colname" : "rsq_seg_resource_quota",
+ "colname" : "vsegresourcequota",
"ctype" : "text",
- "postcomment" : "-- segment resource quota",
+ "postcomment" : "-- vsegment resource quota",
"sqltype" : "text"
},
{
- "colname" : "rsq_creation_time",
+ "colname" : "nvsegupperlimit",
+ "ctype" : "int4",
+ "postcomment" : "-- vsegment size upper limit",
+ "sqltype" : "integer"
+ },
+ {
+ "colname" : "nvseglowerlimit",
+ "ctype" : "int4",
+ "postcomment" : "-- vsegment size lower limit",
+ "sqltype" : "integer"
+ },
+ {
+ "colname" : "nvsegupperlimitperseg",
+ "ctype" : "int4",
+ "postcomment" : "-- vsegment size upper limit per segment",
+ "sqltype" : "integer"
+ },
+ {
+ "colname" : "nvseglowerlimitperseg",
+ "ctype" : "int4",
+ "postcomment" : "-- vsegment size lower limit per segment",
+ "sqltype" : "integer"
+ },
+ {
+ "colname" : "creationtime",
"ctype" : "timestamptz",
"postcomment" : "-- when the queue is created",
"sqltype" : "timestamp_with_time_zone"
},
{
- "colname" : "rsq_update_time",
+ "colname" : "updatetime",
"ctype" : "timestamptz",
"postcomment" : "-- when the queue is updated ( create or alter )",
"sqltype" : "timestamp_with_time_zone"
},
{
- "colname" : "rsq_status",
+ "colname" : "status",
"ctype" : "text",
- "postcomment" : "-- the status of resource queue.",
+ "postcomment" : "-- the status of resource queue",
"sqltype" : "text"
}
],
@@ -8628,8 +8541,8 @@
}
],
"relid_comment_tag" : "/* relation id: 6026 - pg_resqueue */\n",
- "tabdef_text" : "\n CREATE TABLE pg_resqueue\n with (camelcase=ResQueue, shared=true, relid=6026, reltype_oid=9830, toast_oid=9820, toast_index=9821, toast_reltype=9822)\n (\n rsqname name, -- name of resource queue\n rsq_parent oid, -- oid of resource queue\n rsq_active_stats_cluster integer, -- active statement count limit\n rsq_memory_limit_cluster text, -- memory limit in cluster\n rsq_core_limit_cluster text, -- core limit in cluster\n rsq_resource_upper_factor real, -- resource upper limit in cluster\n rsq_allocation_policy text, -- query resource allocation policy\n rsq_seg_resource_quota text, -- segment resource quota\n rsq_creation_time timestamp with time zone, -- when the queue is created\n rsq_update_time timestamp with time zone, -- when the queue is updated ( create or alter )\n rsq_status text, -- the status of resource queue.\n )",
- "tzhack" : "\"rsq_creation_time\" et al",
+ "tabdef_text" : "\n CREATE TABLE pg_resqueue\n with (camelcase=ResQueue, shared=true, relid=6026, reltype_oid=9830, toast_oid=9820, toast_index=9821, toast_reltype=9822)\n (\n name name, -- name of resource queue\n parentoid oid, -- oid of resource queue\n activestats integer, -- active statement count limit\n memorylimit text, -- memory limit in cluster\n corelimit text, -- core limit in cluster\n resovercommit real, -- resource upper limit in cluster\n allocpolicy text, -- query resource allocation policy\n vsegresourcequota text, -- vsegment resource quota\n nvsegupperlimit integer, -- vsegment size upper limit\n nvseglowerlimit integer, -- vsegment size lower limit\n nvsegupperlimitperseg integer, -- vsegment size upper limit per segment\n nvseglowerlimitperseg integer, -- vsegment size lower limit per segment\n creationtime timestamp with time zone, -- when the queue is created\n updatetime timestamp with time zone, -- when the queue is u
pdated ( create or alter )\n status text, -- the status of resource queue\n )",
+ "tzhack" : "\"creationtime\" et al",
"with" : {
"bootstrap" : 0,
"camelcase" : "ResQueue",
@@ -8643,135 +8556,6 @@
"toast_reltype" : "9822"
}
},
- "pg_resqueuecapability" : {
- "CamelCaseRelationId" : "ResQueueCapabilityRelationId",
- "UppercaseReltypeOid" : "PG_RESQUEUECAPABILITY_RELTYPE_OID",
- "colh" : {
- "oid" : "Oid",
- "resqueueid" : "Oid",
- "ressetting" : "text",
- "restypid" : "int2"
- },
- "cols" : [
- {
- "colname" : "resqueueid",
- "ctype" : "Oid",
- "postcomment" : "-- OID of the queue with this capability ",
- "precomment" : "\n**TK_BLANK_LINE**",
- "sqltype" : "oid"
- },
- {
- "colname" : "restypid",
- "ctype" : "int2",
- "postcomment" : "-- resource type id (key to pg_resourcetype) ",
- "sqltype" : "smallint"
- },
- {
- "colname" : "ressetting",
- "ctype" : "text",
- "postcomment" : "-- resource setting (opaque type) ",
- "sqltype" : "text"
- }
- ],
- "filename" : "pg_resqueue.h",
- "fk_list" : [
- {
- "fkcols" : [
- "resqueueid"
- ],
- "pkcols" : [
- "oid"
- ],
- "pktable" : "pg_resqueue",
- "type" : "scalar"
- },
- {
- "fkcols" : [
- "restypid"
- ],
- "pkcols" : [
- "restypid"
- ],
- "pktable" : "pg_resourcetype",
- "type" : "scalar"
- }
- ],
- "foreign_keys" : [
- [
- [
- "resqueueid"
- ],
- "pg_resqueue",
- [
- "oid"
- ]
- ],
- [
- [
- "restypid"
- ],
- "pg_resourcetype",
- [
- "restypid"
- ]
- ]
- ],
- "indexes" : [
- {
- "CamelCaseIndexId" : "ResQueueCapabilityOidIndexId",
- "cols" : [
- [
- "oid",
- "oid_ops"
- ]
- ],
- "indexid" : "6064",
- "unique" : "1",
- "with" : {
- "indexid" : "6064"
- }
- },
- {
- "CamelCaseIndexId" : "ResQueueCapabilityResqueueidIndexId",
- "cols" : [
- [
- "resqueueid",
- "oid_ops"
- ]
- ],
- "indexid" : "6065",
- "unique" : "",
- "with" : {
- "indexid" : "6065"
- }
- },
- {
- "CamelCaseIndexId" : "ResQueueCapabilityRestypidIndexId",
- "cols" : [
- [
- "restypid",
- "int2_ops"
- ]
- ],
- "indexid" : "6066",
- "unique" : "",
- "with" : {
- "indexid" : "6066"
- }
- }
- ],
- "relid_comment_tag" : "/* relation id: 6060 - pg_resqueuecapability */\n",
- "tabdef_text" : "\n CREATE TABLE pg_resqueuecapability\n with (camelcase=ResQueueCapability, shared=true, relid=6060, reltype_oid=6446)\n (\n resqueueid oid,\t\t-- OID of the queue with this capability \n restypid smallint,\t-- resource type id (key to pg_resourcetype) \n ressetting text\t\t-- resource setting (opaque type) \n )",
- "with" : {
- "bootstrap" : 0,
- "camelcase" : "ResQueueCapability",
- "oid" : 1,
- "relid" : "6060",
- "reltype_oid" : "6446",
- "shared" : "1",
- "text" : "with (camelcase=ResQueueCapability, shared=true, relid=6060, reltype_oid=6446)"
- }
- },
"pg_rewrite" : {
"CamelCaseRelationId" : "RewriteRelationId",
"CamelCaseToastInd" : "PgRewriteToastIndex",
[2/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
Posted by yj...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 44e5504..ef055c3 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -11,35 +11,41 @@
* The DDL statement attribute name strings.
*/
char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT]
- [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX] = {
+ [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX+1] = {
"parent",
"active_statements",
"memory_limit_cluster",
"core_limit_cluster",
- "vsegment_resource_quota",
+ "vseg_resource_quota",
"allocation_policy",
- "resource_upper_factor",
- "vsegment_upper_limit"
+ "resource_overcommit_factor",
+ "nvseg_upper_limit",
+ "nvseg_lower_limit",
+ "nvseg_upper_limit_perseg",
+ "nvseg_lower_limit_perseg"
};
/*
* The attribute names for expressing one complete resource queue definition.
*/
static char RSQTBLAttrNames[RSQ_TBL_ATTR_COUNT]
- [RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX] = {
- "parent",
- "active_statements",
- "memory_limit_cluster",
- "core_limit_cluster",
- "vsegment_resource_quota",
- "allocation_policy",
- "resource_upper_factor",
- "vsegment_upper_limit",
+ [RESOURCE_QUEUE_TBL_COLNAME_LENGTH_MAX+1] = {
+ "parentoid",
+ "activestats",
+ "memorylimit",
+ "corelimit",
+ "vsegresourcequota",
+ "allocpolicy",
+ "resovercommit",
+ "nvsegupperlimit",
+ "nvseglowerlimit",
+ "nvsegupperlimitperseg",
+ "nvseglowerlimitperseg",
"oid",
"name",
- "creation_time",
- "update_time",
+ "creationtime",
+ "updatetime",
"status"
};
@@ -48,8 +54,7 @@ static char RSQTBLAttrNames[RSQ_TBL_ATTR_COUNT]
*/
static char RSQDDLValueAllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT]
[RESOURCE_QUEUE_DDL_POLICY_LENGTH_MAX] = {
- "even",
- "fifo"
+ "even"
};
/*
@@ -89,8 +94,7 @@ int computeQueryQuota_FIFO( DynResourceQueueTrack track,
int32_t min(int32_t a, int32_t b);
int32_t max(int32_t a, int32_t b);
computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
- computeQueryQuota_EVEN,
- computeQueryQuota_FIFO
+ computeQueryQuota_EVEN
};
int computeQueryQuota( DynResourceQueueTrack track,
@@ -111,8 +115,7 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track);
int dispatchResourceToQueries_FIFO(DynResourceQueueTrack track);
dispatchResourceToQueriesByPolicy DispatchPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
- dispatchResourceToQueries_EVEN,
- dispatchResourceToQueries_FIFO
+ dispatchResourceToQueries_EVEN
};
void dispatchResourceToQueriesInOneQueue(DynResourceQueueTrack track);
@@ -151,6 +154,9 @@ void markMemoryCoreRatioWaterMark(DQueue marks,
void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack,
uint32_t reason);
+bool isResourceAcceptable(ConnectionTrack conn, int segnumact);
+
+void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack);
/*----------------------------------------------------------------------------*/
/* RESOURCE QUEUE MANAGER EXTERNAL APIs */
/*----------------------------------------------------------------------------*/
@@ -269,7 +275,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
if ( attrindex == -1 )
{
snprintf(errorbuf, errorbufsize,
- "Not defined DDL attribute name [%s]",
+ "not defined DDL attribute name %s",
property->Key.Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return RMDDL_WRONG_ATTRNAME;
@@ -281,14 +287,14 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
{
/* Find oid of the parent resource queue. */
bool exist = false;
- DynResourceQueueTrack parentque = getQueueTrackByQueueName(
- property->Val.Str,
- property->Val.Len,
- &exist);
+ DynResourceQueueTrack parentque =
+ getQueueTrackByQueueName(property->Val.Str,
+ property->Val.Len,
+ &exist);
if ( !exist )
{
snprintf(errorbuf, errorbufsize,
- "Can not recognize parent resource queue name %s.",
+ "can not recognize parent resource queue name %s.",
property->Val.Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return RMDDL_WRONG_ATTRVALUE;
@@ -311,10 +317,13 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
case RSQ_DDL_ATTR_ACTIVE_STATMENTS:
case RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER:
case RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER:
- case RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA:
case RSQ_DDL_ATTR_ALLOCATION_POLICY:
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT:
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
{
/*
* Build property.
@@ -353,6 +362,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
*/
int parseResourceQueueAttributes( List *attributes,
DynResourceQueue queue,
+ bool checkformatonly,
char *errorbuf,
int errorbufsize)
{
@@ -372,8 +382,8 @@ int parseResourceQueueAttributes( List *attributes,
Assert( queue != NULL );
/* Initialize attributes. */
- queue->OID = -1;
- queue->ParentOID = -1;
+ queue->OID = InvalidOid;
+ queue->ParentOID = InvalidOid;
queue->ParallelCount = -1;
queue->ClusterMemoryMB = -1;
queue->Status = RESOURCE_QUEUE_STATUS_VALID_LEAF;
@@ -382,8 +392,11 @@ int parseResourceQueueAttributes( List *attributes,
queue->SegResourceQuotaVCore = -1.0;
queue->SegResourceQuotaMemoryMB = -1;
- queue->ResourceUpperFactor = -1;
- queue->VSegUpperLimit = DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT_N;
+ queue->ResourceOvercommit = DEFAULT_RESQUEUE_OVERCOMMIT_N;
+ queue->NVSegUpperLimit = DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT_N;
+ queue->NVSegLowerLimit = DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT_N;
+ queue->NVSegUpperLimitPerSeg = DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N;
+ queue->NVSegLowerLimitPerSeg = DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N;
queue->AllocatePolicy = -1;
queue->QueuingPolicy = -1;
@@ -394,7 +407,8 @@ int parseResourceQueueAttributes( List *attributes,
memset(queue->Name, '\0', sizeof(queue->Name));
- /* Go through each property content. */
+ /* Go through each attribute content. */
+ errorbuf[0] = '\0';
ListCell *cell = NULL;
foreach(cell, attributes)
{
@@ -414,7 +428,7 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_WRONG_ATTRNAME;
snprintf(errorbuf, errorbufsize,
- "Can not recognize resource queue attribute %s",
+ "of not recognized resource queue attribute %s",
attrname->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
@@ -437,17 +451,6 @@ int parseResourceQueueAttributes( List *attributes,
case RSQ_TBL_ATTR_ACTIVE_STATMENTS:
res = SimpleStringToInt32(attrvalue, &(queue->ParallelCount));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Active statements %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "active statements %d",
- queue->ParallelCount);
break;
case RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER:
@@ -483,98 +486,87 @@ int parseResourceQueueAttributes( List *attributes,
}
break;
- case RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
/* Decide it is a memory quota or core quota. */
- if ( SimpleStringStartWith(
- attrvalue,
- RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
+ if ( SimpleStringStartWith(attrvalue,
+ RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
{
SimpString valuestr;
- setSimpleStringRef(
- &valuestr,
- attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
- attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);
-
- res = SimpleStringToStorageSizeMB(
- &valuestr,
- &(queue->SegResourceQuotaMemoryMB));
+ setSimpleStringRef(&valuestr,
+ attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
+ attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
- "parsed segment resource quota %d MB",
- queue->SegResourceQuotaMemoryMB);
-
- }
- else if ( SimpleStringStartWith(
- attrvalue,
- RESOURCE_QUEUE_SEG_RES_QUOTA_CORE) == FUNC_RETURN_OK )
- {
- SimpString valuestr;
- setSimpleStringRef(
- &valuestr,
- attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_CORE)-1,
- attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_CORE)+1);
+ res = SimpleStringToStorageSizeMB(&valuestr,
+ &(queue->SegResourceQuotaMemoryMB));
- res = SimpleStringToDouble(&valuestr,
- &(queue->SegResourceQuotaVCore));
-
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
- "parsed segment resource quota %lf CORE",
- queue->SegResourceQuotaVCore);
+ /*
+ *--------------------------------------------------------------
+ * Check the value. We accept only :
+ * 64mb, 128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb
+ *--------------------------------------------------------------
+ */
+ if ( res == FUNC_RETURN_OK )
+ {
+ elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
+ "parsed segment resource quota %d MB",
+ queue->SegResourceQuotaMemoryMB);
+
+ if ( !(queue->SegResourceQuotaMemoryMB == (2<<6)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<7)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<8)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<9)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<10)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<11)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<12)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<13)) &&
+ !(queue->SegResourceQuotaMemoryMB == (2<<14)) )
+ {
+ res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
+ snprintf(errorbuf, errorbufsize,
+ "%s value %s is not valid, only 64mb, 128mb, "
+ "256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb are "
+ "valid.",
+ RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA],
+ attrvalue->Str);
+ }
+ }
}
else
{
+ res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
snprintf(errorbuf, errorbufsize,
- "Resource quota limit %s is not valid.",
+ "%s format %s is not valid.",
+ RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA],
attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
}
break;
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- res = SimpleStringToDouble(attrvalue, &(queue->ResourceUpperFactor));
- if ( res != FUNC_RETURN_OK ) {
- snprintf(errorbuf, errorbufsize,
- "Resource upper factor %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "resource upper factor %lf",
- queue->ResourceUpperFactor);
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ res = SimpleStringToDouble(attrvalue, &(queue->ResourceOvercommit));
break;
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
- res = SimpleStringToInt32(attrvalue, &(queue->VSegUpperLimit));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Virtual segment upper limit %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- elog(DEBUG3, "Resource manager parseResourceQueueAttributes() parsed "
- "virtual segment upper limit %d",
- queue->VSegUpperLimit);
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegUpperLimit));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegLowerLimit));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
+ res = SimpleStringToDouble(attrvalue, &(queue->NVSegUpperLimitPerSeg));
+ break;
+
+ case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
+ res = SimpleStringToDouble(attrvalue, &(queue->NVSegLowerLimitPerSeg));
break;
case RSQ_TBL_ATTR_ALLOCATION_POLICY:
- res = SimpleStringToMapIndexInt8(
- attrvalue,
- (char *)RSQDDLValueAllocationPolicy,
- RSQ_ALLOCATION_POLICY_COUNT,
- sizeof(RSQDDLValueAllocationPolicy[0]),
- &(queue->AllocatePolicy));
- if ( res != FUNC_RETURN_OK )
- {
- snprintf(errorbuf, errorbufsize,
- "Allocation policy %s is not valid.",
- attrvalue->Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
+ res = SimpleStringToMapIndexInt8(attrvalue,
+ (char *)RSQDDLValueAllocationPolicy,
+ RSQ_ALLOCATION_POLICY_COUNT,
+ sizeof(RSQDDLValueAllocationPolicy[0]),
+ &(queue->AllocatePolicy));
break;
case RSQ_TBL_ATTR_NAME:
@@ -629,14 +621,22 @@ int parseResourceQueueAttributes( List *attributes,
if ( res != FUNC_RETURN_OK )
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(errorbuf, errorbufsize,
- "Wrong resource queue attribute setting. %s=%s",
- attrname->Str, attrvalue->Str);
+ if ( errorbuf[0] == '\0' )
+ {
+ snprintf(errorbuf, errorbufsize,
+ "wrong resource queue attribute setting. %s=%s",
+ attrname->Str, attrvalue->Str);
+ }
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
+ if ( checkformatonly )
+ {
+ return res;
+ }
+
/*
* Memory and Core resource must be specified and they must use the same way
* to express the resource.
@@ -645,7 +645,8 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "MEMORY_LIMIT_CLUSTER must be specified.");
+ "%s must be specified.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -654,7 +655,8 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "CORE_LIMIT_CLUSTER must be specified.");
+ "%s must be specified.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -664,8 +666,9 @@ int parseResourceQueueAttributes( List *attributes,
{
res = RESQUEMGR_INCONSISTENT_RESOURCE_EXP;
snprintf(errorbuf, errorbufsize,
- "MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER "
- "must use the same way to express resource limit.");
+ "%s and %s must use the same way to express resource limit.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -703,24 +706,7 @@ int updateResourceQueueAttributes(List *attributes,
Assert(queue != NULL);
- /* We can not have duplicate property keys. */
ListCell *cell = NULL;
- foreach(cell, attributes)
- {
- KVProperty value1 = lfirst(cell);
-
- for ( ListCell *cell2 = lnext(cell) ; cell2 != NULL ; cell2 = lnext(cell2) )
- {
- KVProperty value2 = lfirst(cell2);
- if ( SimpleStringComp(&(value1->Key), value2->Key.Str) == 0 )
- {
- res = RESQUEMGR_DUPLICATE_ATTRNAME;
- snprintf(errorbuf, errorbufsize, "Duplicate attributes %s", value1->Key.Str);
- ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
- return res;
- }
- }
- }
/* Go through each property content. */
foreach(cell, attributes)
@@ -816,7 +802,7 @@ int updateResourceQueueAttributes(List *attributes,
}
break;
- case RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA:
+ case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
/* Decide it is a memory quota or core quota. */
if ( SimpleStringStartWith(
attrvalue,
@@ -866,23 +852,23 @@ int updateResourceQueueAttributes(List *attributes,
}
break;
- case RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR:
- res = SimpleStringToDouble(attrvalue, &(queue->ResourceUpperFactor));
+ case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
+ res = SimpleStringToDouble(attrvalue, &(queue->ResourceOvercommit));
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
- "Resource upper limit factor %s is not valid.",
+ "Resource overcommit limit factor %s is not valid.",
attrvalue->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
elog(DEBUG3, "Resource manager updateResourceQueueAttributes() "
"updated Resource upper limit factor %lf",
- queue->ResourceUpperFactor);
+ queue->ResourceOvercommit);
break;
- case RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT:
- res = SimpleStringToInt32(attrvalue, &(queue->VSegUpperLimit));
+ case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
+ res = SimpleStringToInt32(attrvalue, &(queue->NVSegUpperLimit));
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, errorbufsize,
@@ -893,8 +879,8 @@ int updateResourceQueueAttributes(List *attributes,
}
elog(DEBUG3, "Resource manager updateResourceQueueAttributes() "
- "updated virtual segment upper limit %d",
- queue->VSegUpperLimit);
+ "updated virtual segment size upper limit %d",
+ queue->NVSegUpperLimit);
break;
case RSQ_TBL_ATTR_ALLOCATION_POLICY:
@@ -933,7 +919,7 @@ int updateResourceQueueAttributes(List *attributes,
{
res = RESQUEMGR_WRONG_ATTR;
snprintf(errorbuf, errorbufsize,
- "Wrong resource queue attribute setting. %s=%s",
+ "Wrong to parse resource queue attribute setting. %s=%s",
attrname->Str, attrvalue->Str);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
@@ -980,20 +966,11 @@ int updateResourceQueueAttributes(List *attributes,
}
-/**
+/*
* This is one API for checking if new resource queue definition is valid to be
* created.This functions does not generate logs higher than WARNING, the error
* is also saved in error buffer to make the caller able to pass the message to
* remote process.
- *
- * queue[in/out] The queue instance to be tested and completed.
- * errorbuf[out] The error string buffer.
- * errorbufsize[in] The maximum size of error string buffer.
- *
- * Return values:
- * FUNC_RETURN_OK : Succeed.
- * RESQUEMGR_LACK_ATTR : Necessary attributes are not specified.
- * RESQUEMGR_WRONG_ATTR : Unrecognized wrong attribute value.
*/
int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
char *errorbuf,
@@ -1004,8 +981,8 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
Assert( queue != NULL );
- if ( queue->Status & RESOURCE_QUEUE_STATUS_IS_VER1X ) {
-
+ if ( queue->Status & RESOURCE_QUEUE_STATUS_IS_VER1X )
+ {
/* TODO: Validate Version 1.x resource queue definition here. */
return res;
}
@@ -1015,25 +992,29 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
/*
* STEP 1. Validate parent queue attribute.
*/
- if ( queue->ParentOID < 0 ) {
+ if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID == InvalidOid )
+ {
res = RESQUEMGR_LACK_ATTR;
snprintf(errorbuf, errorbufsize,
- "Attribute %s must be specified.",
+ "attribute %s must be specified.",
RSQDDLAttrNames[RSQ_DDL_ATTR_PARENT]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ParentOID != InvalidOid ) {
+ if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID != InvalidOid )
+ {
bool exist = false;
parenttrack = getQueueTrackByQueueOID(queue->ParentOID, &exist);
- Assert((exist && parenttrack != NULL) || !exist);
+ Assert(exist && parenttrack != NULL);
/* pg_default can not be a parent queue. */
- if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) ) {
+ if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_ATTR;
snprintf(errorbuf, errorbufsize,
- "pg_default can not have children resource queues.");
+ "%s can not have children resource queues.",
+ RESOURCE_QUEUE_DEFAULT_QUEUE_NAME);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1053,8 +1034,9 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* STEP 2. Validate active_statements attributes. For leaf queue only.
*/
- if ( queue->ParallelCount <= 0 ) {
- queue->ParallelCount = RESOURCE_QUEUE_PARALLEL_COUNT_DEF;
+ if ( queue->ParallelCount <= 0 )
+ {
+ queue->ParallelCount = DEFAULT_RESQUEUE_ACTIVESTATS_N;
}
/*
@@ -1067,22 +1049,22 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ( RESQUEUE_IS_PERCENT(queue) )
{
/* MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER must be specified.*/
- if ( queue->ClusterMemoryPer == -1 ) {
+ if ( queue->ClusterMemoryPer == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterVCorePer == -1 ) {
+ if ( queue->ClusterVCorePer == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1092,26 +1074,26 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* than 0, less than 100. This is to guarantee the following automatic
* deduction of the limits.
*/
- if ( queue->ClusterVCorePer <= 0 || queue->ClusterVCorePer > 100 ) {
+ if ( queue->ClusterVCorePer <= 0 || queue->ClusterVCorePer > 100 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be between 1%% and 100%%. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterVCorePer);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be between 1%% and 100%%. "
+ "Wrong value = %lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterVCorePer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterMemoryPer <= 0 || queue->ClusterMemoryPer > 100 ) {
+ if ( queue->ClusterMemoryPer <= 0 || queue->ClusterMemoryPer > 100 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be between 1%% and 100%%. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterMemoryPer);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be between 1%% and 100%%. "
+ "Wrong value = %lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1123,17 +1105,16 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ( queue->ClusterVCorePer != queue->ClusterMemoryPer )
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The value of %s must be identical with the value of %s. "
- "Wrong value of %s = %lf%%. "
- "Wrong value of %s = %lf%%. ",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryPer,
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterVCorePer);
+ snprintf(errorbuf, errorbufsize,
+ "The value of %s must be identical with the value of %s. "
+ "Wrong value of %s = %lf%%. "
+ "Wrong value of %s = %lf%%. ",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer,
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterVCorePer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1161,13 +1142,12 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
if ((current + queue->ClusterMemoryPer) > 100)
{
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The value of %s and %s exceeds its parent's limit. "
- "Wrong value = %lf%%",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryPer);
+ snprintf(errorbuf, errorbufsize,
+ "the value of %s and %s exceeds parent queue's limit. "
+ "Wrong value = %.0lf%%",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryPer);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1180,22 +1160,22 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
else {
/* MEMORY_LIMIT_CLUSTER and CORE_LIMIT_CLUSTER must be specified.*/
- if ( queue->ClusterMemoryMB == -1 ) {
+ if ( queue->ClusterMemoryMB == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterVCore == -1 ) {
+ if ( queue->ClusterVCore == -1 )
+ {
res = RESQUEMGR_LACK_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be set.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be set.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1205,26 +1185,26 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* than 0. This is to guarantee the following automatic deduction of the
* limits.
*/
- if ( queue->ClusterVCore <= 0 ) {
+ if ( queue->ClusterVCore <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be greater than 0. "
- "Wrong value = %f",
- RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
- queue->ClusterVCore);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be greater than 0. "
+ "Wrong value = %f",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
+ queue->ClusterVCore);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
- if ( queue->ClusterMemoryMB <= 0 ) {
+ if ( queue->ClusterMemoryMB <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "The explicit value of %s must be greater than 0. "
- "Wrong value = %dMB",
- RSQTBLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
- queue->ClusterMemoryMB);
+ snprintf(errorbuf, errorbufsize,
+ "The explicit value of %s must be greater than 0. "
+ "Wrong value = %dMB",
+ RSQTBLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
+ queue->ClusterMemoryMB);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1234,60 +1214,141 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* STEP 4: Check resource quota.
*/
if ( queue->SegResourceQuotaMemoryMB == -1 &&
- queue->SegResourceQuotaVCore == -1.0 ) {
- queue->SegResourceQuotaMemoryMB = RESOURCE_QUEUE_SEG_RES_QUOTA_DEF;
+ queue->SegResourceQuotaVCore == -1.0 )
+ {
+ queue->SegResourceQuotaMemoryMB = DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA_N;
}
- if ( queue->SegResourceQuotaMemoryMB != -1 ) {
+ if ( queue->SegResourceQuotaMemoryMB != -1 )
+ {
/* The quota value must be greater than 0. */
- if ( queue->SegResourceQuotaMemoryMB <= 0 ) {
+ if ( queue->SegResourceQuotaMemoryMB <= 0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be greater than 0.",
- RSQDDLAttrNames[RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be greater than 0.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
- else if ( queue->SegResourceQuotaVCore != -1.0 ) {
-
+ else if ( queue->SegResourceQuotaVCore != -1.0 )
+ {
/* The quota value must be greater than 0. */
- if ( queue->SegResourceQuotaVCore <= 0.0 ) {
+ if ( queue->SegResourceQuotaVCore <= 0.0 )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be greater than 0.0.",
- RSQTBLAttrNames[RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA]);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be greater than 0.0.",
+ RSQTBLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
}
- else {
+ else
+ {
Assert(0); /* Should never come here. */
}
/*
* STEP 5: Check policy and set default value.
*/
- if ( queue->AllocatePolicy == -1 ) {
+ if ( queue->AllocatePolicy == -1 )
+ {
queue->AllocatePolicy = RSQ_ALLOCATION_POLICY_EVEN;
}
/*
- * STEP 6: Check resource factors.
+ * STEP 6: Check resource over-commit factor.
+ */
+ if ( queue->ResourceOvercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
+ MINIMUM_RESQUEUE_OVERCOMMIT_N,
+ queue->ResourceOvercommit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ /*
+ * STEP 7. Check number of vseg limit.
+ */
+ if ( queue->NVSegUpperLimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %d. Wrong value %d",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N,
+ queue->NVSegUpperLimit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegLowerLimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %d. Wrong value %d",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N,
+ queue->NVSegLowerLimit);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegUpperLimit > 0 &&
+ queue->NVSegLowerLimit > 0 &&
+ queue->NVSegUpperLimit < queue->NVSegLowerLimit )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %s.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT]);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ /*
+ * STEP 8. Check number of vseg limit per segment.
*/
- if ( queue->ResourceUpperFactor == -1.0 ) {
- queue->ResourceUpperFactor = RESOURCE_QUEUE_RES_UPPER_FACTOR_DEF;
+ if ( queue->NVSegUpperLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N,
+ queue->NVSegUpperLimitPerSeg);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
+ }
+
+ if ( queue->NVSegLowerLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
+ {
+ res = RESQUEMGR_WRONG_ATTR;
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %lf. Wrong value %lf",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N,
+ queue->NVSegLowerLimitPerSeg);
+ ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
+ return res;
}
- if ( queue->ResourceUpperFactor < 1.0 ) {
+ if ( queue->NVSegUpperLimitPerSeg > 0 &&
+ queue->NVSegLowerLimitPerSeg > 0 &&
+ queue->NVSegUpperLimitPerSeg < queue->NVSegLowerLimitPerSeg )
+ {
res = RESQUEMGR_WRONG_ATTR;
- snprintf(
- errorbuf, errorbufsize,
- "%s must be no less than 1.0. Wrong value %lf",
- RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR],
- queue->ResourceUpperFactor);
+ snprintf(errorbuf, errorbufsize,
+ "%s must be no less than %s.",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG]);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
return res;
}
@@ -1303,13 +1364,6 @@ int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue queue,
* is a new created instance.
* errorbuf[out] The error message if something is wrong.
* errorbufsize[out] The limit of error message buffer.
- *
- * Return FUNC_RETURN_OK : Everything is ok.
- * RESQUEMGR_DUPLICATE_QUEID : Duplicate resource queue id.
- * RESQUEMGR_NO_QUENAME : No resource queue name specified.
- * RESQUEMGR_DUPLICATE_QUENAME : Duplicate queue name.
- * RESQUEMGR_WRONG_PARENT_QUEUE : The parent queue is wrongly specified.
- *
*/
int createQueueAndTrack( DynResourceQueue queue,
DynResourceQueueTrack *track,
@@ -1381,7 +1435,7 @@ int createQueueAndTrack( DynResourceQueue queue,
isDefaultQueue = RESQUEUE_IS_DEFAULT(queue);
isRootQueue = RESQUEUE_IS_ROOT(queue);
- elog(RMLOG, "HAWQ RM :: To create resource queue instance %s", queue->Name);
+ elog(DEBUG3, "HAWQ RM :: To create resource queue instance %s", queue->Name);
/*
* Check the queue parent-child relationship. No matter the queue is to be
@@ -1389,12 +1443,15 @@ int createQueueAndTrack( DynResourceQueue queue,
* queue has no parent is 'pg_root' say isRootQueue. The queue 'pg_default'
* must has 'pg_root' as the parent queue.
*/
- if ( !isRootQueue ) {
+ if ( !isRootQueue )
+ {
/* Check if the parent queue id exists. */
parenttrack = getQueueTrackByQueueOID(queue->ParentOID, &exist);
- if (exist) {
+ if (exist)
+ {
/* Can not set pg_default as parent queue. */
- if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) ) {
+ if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1405,8 +1462,8 @@ int createQueueAndTrack( DynResourceQueue queue,
}
/* 'pg_default' must has 'pg_root' as parent. */
- if ( isDefaultQueue &&
- !RESQUEUE_IS_ROOT(parenttrack->QueueInfo) ) {
+ if ( isDefaultQueue && !RESQUEUE_IS_ROOT(parenttrack->QueueInfo) )
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1417,7 +1474,8 @@ int createQueueAndTrack( DynResourceQueue queue,
}
/* The parent queue can not have connections. */
- if ( parenttrack->CurConnCounter > 0 ) {
+ if ( parenttrack->CurConnCounter > 0 )
+ {
res = RESQUEMGR_IN_USE;
snprintf( errorbuf, errorbufsize,
ERRORPOS_FORMAT
@@ -1428,11 +1486,11 @@ int createQueueAndTrack( DynResourceQueue queue,
goto exit;
}
}
- else {
+ else
+ {
res = RESQUEMGR_WRONG_PARENT_QUEUE;
snprintf(errorbuf, errorbufsize,
- ERRORPOS_FORMAT "No expected parent queue " INT64_FORMAT,
- ERRREPORTPOS,
+ "No expected parent queue " INT64_FORMAT,
queue->ParentOID);
ELOG_ERRBUF_MESSAGE(WARNING, errorbuf)
goto exit;
@@ -1443,8 +1501,8 @@ int createQueueAndTrack( DynResourceQueue queue,
* core ratio related information should also be updated.
*/
if ( RESQUEUE_IS_LEAF(parenttrack->QueueInfo) &&
- parenttrack->trackedMemCoreRatio ) {
-
+ parenttrack->trackedMemCoreRatio )
+ {
/* Remove parent track from memory core ratio track */
removeResourceQueueRatio(parenttrack);
@@ -1454,8 +1512,6 @@ int createQueueAndTrack( DynResourceQueue queue,
}
}
-
-
/* Set parent resource queue track reference. */
newqueuetrack->ParentTrack = parenttrack;
@@ -1481,16 +1537,17 @@ int createQueueAndTrack( DynResourceQueue queue,
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PQUEMGR->Queues = lappend(PQUEMGR->Queues, newqueuetrack);
MEMORY_CONTEXT_SWITCH_BACK
- if( newqueuetrack->QueueInfo->OID != InvalidOid ) {
+ if( newqueuetrack->QueueInfo->OID != InvalidOid )
+ {
setQueueTrackIndexedByQueueOID(newqueuetrack);
}
setQueueTrackIndexedByQueueName(newqueuetrack);
/* Update overall ratio index. */
- if ( !RESQUEUE_IS_PERCENT(newqueuetrack->QueueInfo) ) {
- newqueuetrack->MemCoreRatio =
- trunc(newqueuetrack->QueueInfo->ClusterMemoryMB /
- newqueuetrack->QueueInfo->ClusterVCore);
+ if ( !RESQUEUE_IS_PERCENT(newqueuetrack->QueueInfo) )
+ {
+ newqueuetrack->MemCoreRatio = trunc(newqueuetrack->QueueInfo->ClusterMemoryMB /
+ newqueuetrack->QueueInfo->ClusterVCore);
addResourceQueueRatio(newqueuetrack);
}
@@ -1498,7 +1555,8 @@ int createQueueAndTrack( DynResourceQueue queue,
*track = newqueuetrack;
exit:
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
/* Free resource queue track instance. */
freeDynResourceQueueTrack(newqueuetrack);
*track = NULL;
@@ -1567,15 +1625,14 @@ int dropQueueAndTrack( DynResourceQueueTrack track,
}
-DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid,
- bool *exist)
+DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid, bool *exist)
{
PAIR pair = NULL;
SimpArray key;
setSimpleArrayRef(&key, (char *)&queoid, sizeof(int64_t));
- pair = getHASHTABLENode(&(PQUEMGR->QueuesIDIndex),
- (void *)&key);
- if ( pair == NULL ) {
+ pair = getHASHTABLENode(&(PQUEMGR->QueuesIDIndex), (void *)&key);
+ if ( pair == NULL )
+ {
*exist = false;
return NULL;
}
@@ -1997,15 +2054,21 @@ exit:
/**
* Return one connection to resource queue.
*/
-void returnConnectionToQueue(ConnectionTrack conntrack, bool normally)
+void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout)
{
DynResourceQueueTrack track = (DynResourceQueueTrack)(conntrack->QueueTrack);
- if ( normally )
+ if ( !istimeout )
{
transformConnectionTrackProgress(conntrack, CONN_PP_ESTABLISHED);
}
+ else
+ {
+ transformConnectionTrackProgress(conntrack, CONN_PP_TIMEOUT_FAIL);
+ }
+
track->CurConnCounter--;
- if ( track->CurConnCounter == 0 ) {
+ if ( track->CurConnCounter == 0 )
+ {
track->isBusy = false;
refreshMemoryCoreRatioLimits();
refreshMemoryCoreRatioWaterMark();
@@ -2045,7 +2108,8 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
- if ( queuetrack->ClusterSegNumberMax == 0 ) {
+ if ( queuetrack->ClusterSegNumberMax == 0 )
+ {
elog(LOG, "The queue %s has no resource available to run queries.",
queuetrack->QueueInfo->Name);
return RESQUEMGR_NO_RESOURCE;
@@ -2061,7 +2125,8 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
&(conntrack->SegNumMin),
conntrack->VSegLimit);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
int32_t Rmax = conntrack->SegNum;
int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
@@ -2074,14 +2139,6 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
int32_t Gmax= conntrack->MaxSegCountFixed;
int32_t Gmin= conntrack->MinSegCountFixed;
- /* Apply upper vseg limit. */
- if ( conntrack->MaxSegCountFixed > queuetrack->QueueInfo->VSegUpperLimit &&
- conntrack->MinSegCountFixed <= queuetrack->QueueInfo->VSegUpperLimit )
- {
- Gmax = queuetrack->QueueInfo->VSegUpperLimit;
- elog(LOG, "Maximum vseg num is limited to %d", Gmax);
- }
-
if(Gmin==1)
{
/* case 1 */
@@ -2113,13 +2170,20 @@ int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
}
}
- elog(LOG, "HAWQ RM :: Expect (%d MB, %lf CORE) x %d ( min %d ) resource.",
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
conntrack->SegMemoryMB,
conntrack->SegCore,
conntrack->SegNum,
conntrack->SegNumMin);
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
/* Add request to the resource queue and return. */
res = addQueryResourceRequestToQueue(queuetrack, conntrack);
@@ -2144,11 +2208,13 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
userinfo = getUserByUserName(conntrack->UserID,
strlen(conntrack->UserID),
&exist);
- if ( exist ) {
+ if ( exist )
+ {
/* Get the queue, and check if the parallel limit is achieved. */
queuetrack = getQueueTrackByQueueOID(userinfo->QueueOID, &exist);
}
- else {
+ else
+ {
elog(LOG, "No user %s defined for registering connection. Assign to "
"default queue.",
conntrack->UserID);
@@ -2156,7 +2222,8 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
userinfo = NULL;
}
- if ( queuetrack == NULL ) {
+ if ( queuetrack == NULL )
+ {
elog(LOG, "Resource manager fails to find target resource queue for user %s.",
conntrack->UserID);
res = RESQUEMGR_NO_ASSIGNEDQUEUE;
@@ -2173,7 +2240,8 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
&(conntrack->SegNumMin),
conntrack->VSegLimit);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
int32_t Rmax = conntrack->SegNum;
int32_t RmaxL =conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
@@ -2186,14 +2254,6 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
int32_t Gmax= conntrack->MaxSegCountFixed;
int32_t Gmin= conntrack->MinSegCountFixed;
- /* Apply upper vseg limit. */
- if ( conntrack->MaxSegCountFixed > queuetrack->QueueInfo->VSegUpperLimit &&
- conntrack->MinSegCountFixed <= queuetrack->QueueInfo->VSegUpperLimit )
- {
- Gmax = queuetrack->QueueInfo->VSegUpperLimit;
- elog(LOG, "Maximum vseg num is limited to %d", Gmax);
- }
-
if(Gmin==1)
{
/* case 1 */
@@ -2230,14 +2290,79 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
conntrack->SegCore,
conntrack->SegNum,
conntrack->SegNumMin);
+
+ adjustResourceExpectsByQueueNVSegLimits(conntrack);
+
+ elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
+ "resource after adjusting based on queue NVSEG limits.",
+ conntrack->SegMemoryMB,
+ conntrack->SegCore,
+ conntrack->SegNum,
+ conntrack->SegNumMin);
}
- else {
+ else
+ {
elog(LOG, "Not accepted resource acquiring request.");
}
exit:
return res;
}
+void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack)
+{
+ DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
+
+ if ( queuetrack == NULL )
+ {
+ elog(WARNING, "Detected connection track without assigned queue. ConnID %d",
+ conntrack->ConnID);
+ return;
+ }
+
+ if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N ||
+ queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+ {
+ if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N &&
+ queuetrack->QueueInfo->NVSegLowerLimit > conntrack->SegNumMin &&
+ queuetrack->QueueInfo->NVSegLowerLimit <= conntrack->SegNum )
+ {
+ conntrack->SegNumMin = queuetrack->QueueInfo->NVSegLowerLimit;
+ }
+
+ if ( queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N &&
+ queuetrack->QueueInfo->NVSegUpperLimit >= conntrack->SegNumMin &&
+ queuetrack->QueueInfo->NVSegUpperLimit < conntrack->SegNum )
+ {
+ conntrack->SegNum = queuetrack->QueueInfo->NVSegUpperLimit;
+ }
+ }
+ else if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg > 0 ||
+ queuetrack->QueueInfo->NVSegUpperLimitPerSeg > 0 )
+ {
+ if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg >
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
+ {
+ int minnvseg = ceil(queuetrack->QueueInfo->NVSegLowerLimitPerSeg *
+ PRESPOOL->AvailNodeCount);
+ if ( minnvseg > conntrack->SegNumMin && minnvseg <= conntrack->SegNum)
+ {
+ conntrack->SegNumMin = minnvseg;
+ }
+ }
+
+ if ( queuetrack->QueueInfo->NVSegUpperLimitPerSeg >
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
+ {
+ int maxnvseg = ceil(queuetrack->QueueInfo->NVSegUpperLimitPerSeg *
+ PRESPOOL->AvailNodeCount);
+ if ( maxnvseg >= conntrack->SegNumMin && maxnvseg < conntrack->SegNum)
+ {
+ conntrack->SegNum = maxnvseg;
+ }
+ }
+ }
+}
+
/* Resource is returned from query to resource queue. */
int returnResourceToResQueMgr(ConnectionTrack conntrack)
{
@@ -2650,11 +2775,9 @@ int checkUserAttributes( UserInfo user, char *errorbuf, int errorbufsize)
return FUNC_RETURN_OK;
}
-/* Create one user */
-int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize)
+/* Create one user. Expect always no error. */
+void createUser(UserInfo userinfo)
{
- int res = FUNC_RETURN_OK;
-
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PQUEMGR->Users = lappend(PQUEMGR->Users, userinfo);
MEMORY_CONTEXT_SWITCH_BACK
@@ -2664,8 +2787,6 @@ int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize)
setUserIndexedByUserOID(userinfo);
}
setUserIndexedByUserName(userinfo);
-
- return res;
}
void setUserIndexedByUserOID(UserInfo userinfo)
@@ -3114,8 +3235,10 @@ int getRSQTBLAttributeNameIndex(SimpStringPtr attrname)
int getRSQDDLAttributeNameIndex(SimpStringPtr attrname)
{
- for ( int i = 0 ; i < RSQ_DDL_ATTR_COUNT ; ++i ) {
- if ( SimpleStringComp(attrname, RSQDDLAttrNames[i]) == 0 ) {
+ for ( int i = 0 ; i < RSQ_DDL_ATTR_COUNT ; ++i )
+ {
+ if ( SimpleStringComp(attrname, RSQDDLAttrNames[i]) == 0 )
+ {
return i;
}
}
@@ -3354,12 +3477,12 @@ void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
}
track->ClusterMemoryMaxPer = track->ClusterMemoryActPer *
- track->QueueInfo->ResourceUpperFactor;
+ track->QueueInfo->ResourceOvercommit;
track->ClusterMemoryMaxPer = track->ClusterMemoryMaxPer > 100 ?
100.0 :
track->ClusterMemoryMaxPer;
track->ClusterVCoreMaxPer = track->ClusterVCoreActPer *
- track->QueueInfo->ResourceUpperFactor;
+ track->QueueInfo->ResourceOvercommit;
track->ClusterVCoreMaxPer = track->ClusterVCoreMaxPer > 100 ?
100.0 :
track->ClusterVCoreMaxPer;
@@ -3475,7 +3598,7 @@ void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
track->QueueInfo->SegResourceQuotaVCore,
track->ClusterSegNumber,
track->ClusterSegNumberMax,
- track->QueueInfo->ResourceUpperFactor);
+ track->QueueInfo->ResourceOvercommit);
}
}
@@ -3818,7 +3941,8 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
&(conn->Resource),
&segnumact,
&(conn->SegIOBytes));
- if ( segnumact >= conn->SegNumMin )
+
+ if ( isResourceAcceptable(conn, segnumact) )
{
elog(DEBUG3, "Resource manager dispatched %d segment(s) to connection %d",
segnumact,
@@ -3905,6 +4029,65 @@ int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
return FUNC_RETURN_OK;
}
+
+bool isResourceAcceptable(ConnectionTrack conn, int segnumact)
+{
+ /* Enough number of vsegments. */
+ if ( segnumact < conn->SegNumMin )
+ {
+ return false;
+ }
+
+ /*
+ *--------------------------------------------------------------------------
+ * Spread wide enough. If there is at least one segment containing 2 or more
+ * vsegments, the number of segments should be not be too small, i.e. the
+ * vsegments should not be assigned in a few segments.
+ *--------------------------------------------------------------------------
+ */
+ if ( segnumact > list_length(conn->Resource) )
+ {
+ if ( PRESPOOL->SlavesHostCount - rm_tolerate_nseg_limit >
+ list_length(conn->Resource) )
+ {
+ elog(WARNING, "Find virtual segments are dispatched to %d segments, "
+ "less than %d",
+ segnumact,
+ list_length(conn->Resource));
+ return false;
+ }
+ }
+
+ /*
+ *--------------------------------------------------------------------------
+ * Spread even enough. If the size of vsegments in each segment varies too
+ * much, the allocation result is not accepted.
+ *--------------------------------------------------------------------------
+ */
+ if ( segnumact > list_length(conn->Resource) )
+ {
+ int minval = segnumact;
+ int maxval = 0;
+ ListCell *cell = NULL;
+ foreach(cell, conn->Resource)
+ {
+ VSegmentCounterInternal vsegcnt = lfirst(cell);
+ minval = minval < vsegcnt->VSegmentCount ? minval : vsegcnt->VSegmentCount;
+ maxval = maxval > vsegcnt->VSegmentCount ? maxval : vsegcnt->VSegmentCount;
+ }
+ if ( rm_nvseg_variance_among_seg_limit < maxval - minval )
+ {
+ elog(WARNING, "Find virtual segments are not evenly dispatched to segments, "
+ "maximum virtual segment size is %d, "
+ "minimum virtual segment size is %d.",
+ maxval,
+ minval);
+ return false;
+ }
+ }
+ return true;
+}
+
int dispatchResourceToQueries_FIFO(DynResourceQueueTrack track)
{
return FUNC_RETURN_OK;
@@ -4100,13 +4283,13 @@ void timeoutDeadResourceAllocation(void)
curcon->ConnID);
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
elog(LOG, "The allocated resource timeout is detected. "
"ConnID %d",
curcon->ConnID);
returnResourceToResQueMgr(curcon);
- returnConnectionToQueue(curcon, false);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
@@ -4119,13 +4302,13 @@ void timeoutDeadResourceAllocation(void)
case CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT:
{
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
elog(LOG, "The queued resource request timeout is detected. "
"ConnID %d",
curcon->ConnID);
cancelResourceAllocRequest(curcon);
- returnConnectionToQueue(curcon, false);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
@@ -4138,12 +4321,12 @@ void timeoutDeadResourceAllocation(void)
case CONN_PP_REGISTER_DONE:
{
if ( curmsec - curcon->LastActTime >
- 1000000L * rm_resource_noaction_timeout )
+ 1000000L * rm_session_lease_timeout )
{
- elog(LOG, "The registered connection timeout is detected. "
- "ConnID %d",
- curcon->ConnID);
- returnConnectionToQueue(curcon, false);
+ elog(WARNING, "The registered connection timeout is detected. "
+ "ConnID %d",
+ curcon->ConnID);
+ returnConnectionToQueue(curcon, true);
if ( curcon->CommBuffer != NULL )
{
curcon->CommBuffer->toClose = true;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/utils/network_utils.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/network_utils.c b/src/backend/resourcemanager/utils/network_utils.c
index 51f1294..462e085 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -1,10 +1,7 @@
#include "utils/network_utils.h"
#include "utils/memutilities.h"
#include "miscadmin.h"
-
-#ifdef GETIFADDRS_USING_SIGAR
#include "sigar.h"
-#endif
#include <arpa/inet.h>
#include <sys/socket.h>
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/tcop/utility.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 19fd47a..986afb3 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1646,18 +1646,10 @@ ProcessUtility(Node *parsetree,
case T_AlterQueueStmt:
/* if guc variable not set, or bootstrap mode, or utility mode connection, throw exception*/
- /*if (!(IsBootstrapProcessingMode() || (Gp_role == GP_ROLE_UTILITY)
- || gp_called_by_pgdump))
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support alter resource queue statement yet") ));
- }*/
alterResourceQueue((AlterQueueStmt *) parsetree);
- //AlterQueue((AlterQueueStmt *) parsetree);
break;
case T_DropQueueStmt:
- //DropQueue((DropQueueStmt *) parsetree);
dropResourceQueue((DropQueueStmt *) parsetree);
break;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/etc/slaves.exclude
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/slaves.exclude b/src/backend/utils/misc/etc/slaves.exclude
deleted file mode 100644
index 7338e8b..0000000
--- a/src/backend/utils/misc/etc/slaves.exclude
+++ /dev/null
@@ -1 +0,0 @@
-jiny2
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/etc/yarn-client.xml
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/yarn-client.xml b/src/backend/utils/misc/etc/yarn-client.xml
new file mode 100644
index 0000000..b401bdf
--- /dev/null
+++ b/src/backend/utils/misc/etc/yarn-client.xml
@@ -0,0 +1,288 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<configuration>
+
+<!-- KDC
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>kerberos</value>
+ </property>
+KDC -->
+
+<!-- HA
+ <property>
+ <name>dfs.nameservices</name>
+ <value>phdcluster</value>
+ </property>
+
+ <property>
+ <name>dfs.ha.namenodes.phdcluster</name>
+ <value>nn1,nn2</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.phdcluster.nn1</name>
+ <value>mdw:9000</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.phdcluster.nn2</name>
+ <value>smdw:9000</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.phdcluster.nn1</name>
+ <value>mdw:50070</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.phdcluster.nn2</name>
+ <value>smdw:50070</value>
+ </property>
+
+HA -->
+
+ <!-- RPC client configuration -->
+ <property>
+ <name>rpc.client.timeout</name>
+ <value>3600000</value>
+ <description>
+ timeout interval of a RPC invocation in millisecond. default is 3600000.
+ </description>
+ </property>
+ <property>
+ <name>rpc.client.connect.tcpnodelay</name>
+ <value>true</value>
+ <description>
+ whether set socket TCP_NODELAY to true when connect to RPC server. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.max.idle</name>
+ <value>10000</value>
+ <description>
+ the max idle time of a RPC connection in millisecond. default is 10000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.ping.interval</name>
+ <value>10000</value>
+ <description>
+ the interval which the RPC client send a heart beat to server. 0 means disable, default is 10000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to setup the connection. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.connect.retry</name>
+ <value>10</value>
+ <description>
+ the max retry times if the RPC client fail to setup the connection to server. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to read from server. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the RPC client is trying to write to server. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>rpc.client.socekt.linger.timeout</name>
+ <value>-1</value>
+ <description>
+ set value to socket SO_LINGER when connect to RPC server. -1 means default OS value. default is -1.
+ </description>
+ </property>
+
+ <!-- dfs client configuration -->
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ <description>
+ whether reading block file bypass datanode if the block and the client are on the same node. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.default.replica</name>
+ <value>3</value>
+ <description>
+ the default number of replica. default is 3.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.prefetchsize</name>
+ <value>10</value>
+ <description>
+ the default number of blocks which information will be prefetched. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.client.failover.max.attempts</name>
+ <value>15</value>
+ <description>
+ if multiply namenodes are configured, it is the max retry times when the dfs client try to issue a RPC call. default is 15.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.default.blocksize</name>
+ <value>134217728</value>
+ <description>
+ default block size. default is 134217728.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.client.log.severity</name>
+ <value>INFO</value>
+ <description>
+ the minimal log severity level, valid values include FATAL, ERROR, INFO, DEBUG1, DEBUG2, DEBUG3. default is INFO.
+ </description>
+ </property>
+
+ <!-- input client configuration -->
+ <property>
+ <name>input.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to setup the connection to datanode. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to read from datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the input stream is trying to write to datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.localread.default.buffersize</name>
+ <value>2097152</value>
+ <description>
+ number of bytes of the buffer which is used to hold the data from block file and verify checksum.
+ it is only used when "dfs.client.read.shortcircuit" is set to true. default is 1048576.
+ </description>
+ </property>
+
+ <property>
+ <name>input.localread.blockinfo.cachesize</name>
+ <value>1000</value>
+ <description>
+ the size of block file path information cache. default is 1000.
+ </description>
+ </property>
+
+ <property>
+ <name>input.read.getblockinfo.retry</name>
+ <value>3</value>
+ <description>
+ the max retry times when the client fail to get block information from namenode. default is 3.
+ </description>
+ </property>
+
+ <!-- output client configuration -->
+ <property>
+ <name>output.replace-datanode-on-failure</name>
+ <value>false</value>
+ <description>
+ whether the client add new datanode into pipeline if the number of nodes in pipeline is less the specified number of replicas. default is true.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.chunksize</name>
+ <value>512</value>
+ <description>
+ the number of bytes of a chunk in pipeline. default is 512.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.packetsize</name>
+ <value>65536</value>
+ <description>
+ the number of bytes of a packet in pipeline. default is 65536.
+ </description>
+ </property>
+
+ <property>
+ <name>output.default.write.retry</name>
+ <value>10</value>
+ <description>
+ the max retry times when the client fail to setup the pipeline. default is 10.
+ </description>
+ </property>
+
+ <property>
+ <name>output.connect.timeout</name>
+ <value>600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to setup the connection to datanode. default is 600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.read.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to read from datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.write.timeout</name>
+ <value>3600000</value>
+ <description>
+ the timeout interval in millisecond when the output stream is trying to write to datanode. default is 3600000.
+ </description>
+ </property>
+
+ <property>
+ <name>output.packetpool.size</name>
+ <value>1024</value>
+ <description>
+ the max number of packets in a file's packet pool. default is 1024.
+ </description>
+ </property>
+
+ <property>
+ <name>output.close.timeout</name>
+ <value>900000</value>
+ <description>
+ the timeout interval in millisecond when close an output stream. default is 900000.
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 1e91d56..5a2c90a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4402,15 +4402,6 @@ static struct config_bool ConfigureNamesBool[] =
},
{
- {"hawq_rm_domain_comm_enable", PGC_USERSET, DEVELOPER_OPTIONS,
- gettext_noop("Indicate whether domain socket for RM communication is allowed"),
- NULL,
- },
- &rm_domain_comm_enable,
- false, NULL, NULL
- },
-
- {
{"hawq_resourceenforcer_cpu_enable", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("enable enforcing cpu resource consumption."),
NULL
@@ -4447,6 +4438,15 @@ static struct config_bool ConfigureNamesBool[] =
true, NULL, NULL
},
+ {
+ {"hawq_rm_session_lease_heartbeat_enable", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("enable or disable session lease heartbeat for test."),
+ NULL
+ },
+ &rm_session_lease_heartbeat_enable,
+ true, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
@@ -6294,32 +6294,32 @@ static struct config_int ConfigureNamesInt[] =
1, 1, 65535, NULL, NULL
},
- {
- {"hawq_resourcemanager_master_address_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("master resource manager server address port number"),
- NULL
- },
- &rm_master_addr_port,
- 5437, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_master_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager master server port number"),
+ NULL
+ },
+ &rm_master_port,
+ 5437, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_master_address_domainsocket_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("master resource manager server address domain socket port number"),
- NULL
- },
- &rm_master_addr_domain_port,
- 5436, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_segment_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager segment server port number"),
+ NULL
+ },
+ &rm_segment_port,
+ 5438, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_segment_address_port", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("segment resource manager server address port number"),
- NULL
- },
- &rm_seg_addr_port,
- 5438, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_master_domain_port", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("resource manager master domain socket port number"),
+ NULL
+ },
+ &rm_master_domain_port,
+ 5436, 1, 65535, NULL, NULL
+ },
{
{"hawq_resourcemanager_log_level", PGC_USERSET, DEVELOPER_OPTIONS,
@@ -6391,32 +6391,34 @@ static struct config_int ConfigureNamesInt[] =
0, 0, 10, NULL, NULL
},
- {
- {"hawq_resourcemanager_query_vsegment_number_limit", PGC_USERSET, RESOURCES_MGM,
- gettext_noop("the limit of the number of virtual segments for one query."),
- NULL
- },
- &rm_query_vseg_num_limit,
- 1000, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nvseg_perquery_limit", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the limit of the number of virtual segments for one query."),
+ NULL
+ },
+ &rm_nvseg_perquery_limit,
+ 1000, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_query_vsegment_number_per_segment_limit", PGC_USERSET, RESOURCES_MGM,
- gettext_noop("the limit of the number of virtual segments in one segment for one query."),
- NULL
- },
- &rm_query_vseg_num_per_seg_limit,
- 8, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nvseg_perquery_perseg_limit", PGC_USERSET, RESOURCES_MGM,
+ gettext_noop("the limit of the number of virtual segments in one "
+ "segment for one query."),
+ NULL
+ },
+ &rm_nvseg_perquery_perseg_limit,
+ 8, 1, 65535, NULL, NULL
+ },
- {
- {"hawq_resourcemanager_segment_slice_number_limit", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("the limit of the number of slice number in one segment for one query."),
- NULL
- },
- &rm_slice_num_per_seg_limit,
- 3000, 1, 65535, NULL, NULL
- },
+ {
+ {"hawq_rm_nslice_perseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the limit of the number of slice number in one segment "
+ "for one query."),
+ NULL
+ },
+ &rm_nslice_perseg_limit,
+ 3000, 1, 65535, NULL, NULL
+ },
{
{"hawq_resourcemanager_segment_container_waterlevel", PGC_POSTMASTER, RESOURCES_MGM,
@@ -6430,12 +6432,13 @@ static struct config_int ConfigureNamesInt[] =
},
{
- {"hawq_resourcemanager_resource_noacition_timeout", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("timeout for closing active connection for resource negotiation."),
+ {"hawq_rm_session_lease_timeout", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("timeout for closing a session lease if dispatcher does "
+ "not send heart-beat for a while."),
NULL
},
- &rm_resource_noaction_timeout,
- 10, -1, 65535, NULL, NULL
+ &rm_session_lease_timeout,
+ 10, 5, 65535, NULL, NULL
},
{
@@ -6466,6 +6469,24 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"hawq_rm_tolerate_nseg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the size of down segments that resource manager should tolerate at most ."),
+ NULL
+ },
+ &rm_tolerate_nseg_limit,
+ 2, 0, 65535, NULL, NULL
+ },
+
+ {
+ {"hawq_rm_nvseg_variance_amon_seg_limit", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("the variance of vseg number in each segment that resource manager should tolerate at most."),
+ NULL
+ },
+ &rm_nvseg_variance_among_seg_limit,
+ 1, 0, 65535, NULL, NULL
+ },
+
+ {
{"hawq_reourcemanager_max_resourcequeue_number", PGC_POSTMASTER, RESOURCES_MGM,
gettext_noop("the maximum number of resource queue."),
NULL
@@ -8003,14 +8024,14 @@ static struct config_string ConfigureNamesString[] =
"", NULL, NULL
},
- {
- {"hawq_resourcemanager_server_type", PGC_POSTMASTER, RESOURCES_MGM,
- gettext_noop("set resource management server type"),
- NULL
+ {
+ {"hawq_global_rm_type", PGC_POSTMASTER, RESOURCES_MGM,
+ gettext_noop("set resource management server type"),
+ NULL
},
- &rm_grm_server_type,
+ &rm_global_rm_type,
"none", NULL, NULL
- },
+ },
{
{"hawq_resourcemanager_yarn_resourcemanager_address", PGC_POSTMASTER, RESOURCES_MGM,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/bin/pg_dump/pg_dumpall.c
----------------------------------------------------------------------
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 1686936..fdbbd46 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -589,7 +589,7 @@ dumpResQueues(PGconn *conn)
"rsq_allocation_policy::text as ressetting, "
"6 as ord FROM pg_resqueue "
"UNION "
- "SELECT rsqname, 'vsegment_resource_quota' as resname, "
+ "SELECT rsqname, 'vseg_resource_quota' as resname, "
"rsq_vseg_resource_quota::text as ressetting, "
"7 as ord FROM pg_resqueue "
"UNION "
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/include/catalog/indexing.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 41d7e40..73a69d1 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -377,42 +377,22 @@ DECLARE_UNIQUE_INDEX(pg_filesystem_fsysname_index, 7184, on pg_filesystem using
/* relation id: 7076 - pg_remote_credentials 20140205 */
DECLARE_UNIQUE_INDEX(pg_remote_credentials_owner_service_index, 7081, on pg_remote_credentials using btree(rcowner oid_ops, rcservice text_ops));
#define RemoteCredentialsOwnerServiceIndexId 7081
-/* relation id: 6026 - pg_resqueue 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueue_oid_index, 6027, on pg_resqueue using btree(oid oid_ops));
-#define ResQueueOidIndexId 6027
-/* relation id: 6026 - pg_resqueue 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueue_rsqname_index, 6028, on pg_resqueue using btree(rsqname name_ops));
-#define ResQueueRsqnameIndexId 6028
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_oid_index, 6061, on pg_resourcetype using btree(oid oid_ops));
-#define ResourceTypeOidIndexId 6061
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_restypid_index, 6062, on pg_resourcetype using btree(restypid int2_ops));
-#define ResourceTypeRestypidIndexId 6062
-/* relation id: 6059 - pg_resourcetype 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resourcetype_resname_index, 6063, on pg_resourcetype using btree(resname name_ops));
-#define ResourceTypeResnameIndexId 6063
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_UNIQUE_INDEX(pg_resqueuecapability_oid_index, 6064, on pg_resqueuecapability using btree(oid oid_ops));
-#define ResQueueCapabilityOidIndexId 6064
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_INDEX(pg_resqueuecapability_resqueueid_index, 6065, on pg_resqueuecapability using btree(resqueueid oid_ops));
-#define ResQueueCapabilityResqueueidIndexId 6065
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-DECLARE_INDEX(pg_resqueuecapability_restypid_index, 6066, on pg_resqueuecapability using btree(restypid int2_ops));
-#define ResQueueCapabilityRestypidIndexId 6066
/* relation id: 5036 - gp_segment_configuration 20150207 */
DECLARE_UNIQUE_INDEX(gp_segment_config_registration_order_index, 6106, on gp_segment_configuration using btree(registration_order int4_ops));
#define GpSegmentConfigRegistration_orderIndexId 6106
-
/* relation id: 5036 - gp_segment_configuration 20150207 */
DECLARE_INDEX(gp_segment_config_role_index, 6107, on gp_segment_configuration using btree(role char_ops));
#define GpSegmentConfigRoleIndexId 6107
-
/* relation id: 6119 - pg_attribute_encoding 20141112 */
DECLARE_INDEX(pg_attribute_attrelid_index, 6119, on pg_attribute using btree(attrelid oid_ops));
#define AttributeAttrelidIndexId 6119
+/* relation id: 6026 - pg_resqueue 20151014 */
+DECLARE_UNIQUE_INDEX(pg_resqueue_oid_index, 6027, on pg_resqueue using btree(oid oid_ops));
+#define ResQueueOidIndexId 6027
+/* relation id: 6026 - pg_resqueue 20151014 */
+DECLARE_UNIQUE_INDEX(pg_resqueue_rsqname_index, 6028, on pg_resqueue using btree(rsqname name_ops));
+#define ResQueueRsqnameIndexId 6028
/* TIDYCAT_END_CODEGEN */
[4/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
Posted by yj...@apache.org.
HAWQ-25. Add resource queue new ddl statement implementation, refine partial GUC variable names, use libyarn supporting kerberos.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/a413a426
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/a413a426
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/a413a426
Branch: refs/heads/master
Commit: a413a4265f2ed4ecf7a4199248f57824d279427a
Parents: 7e6838f
Author: Yi Jin <yj...@pivotal.io>
Authored: Thu Oct 15 20:40:39 2015 +0800
Committer: Yi Jin <yj...@pivotal.io>
Committed: Thu Oct 15 20:40:39 2015 +0800
----------------------------------------------------------------------
src/Makefile.global.in | 2 +-
src/backend/Makefile | 2 +-
src/backend/bootstrap/bootparse.y | 14 +-
src/backend/catalog/caql/Makefile | 1 -
src/backend/catalog/catalog.c | 40 +-
src/backend/catalog/toasting.c | 3 +-
src/backend/cdb/cdbdatalocality.c | 8 +
src/backend/cdb/cdbvars.c | 54 +-
src/backend/commands/Makefile | 2 +-
src/backend/commands/queue.c | 132 ---
src/backend/commands/user.c | 159 +---
src/backend/optimizer/plan/planner.c | 9 +
src/backend/postmaster/identity.c | 4 +-
.../communication/rmcomm_QD2RM.c | 297 +++----
.../communication/rmcomm_QE2RMSEG.c | 6 +-
.../communication/rmcomm_RM2RMSEG.c | 14 +-
.../communication/rmcomm_RMSEG2RM.c | 2 +-
src/backend/resourcemanager/errorcode.c | 2 +
.../include/communication/rmcomm_QD2RM.h | 17 +-
.../communication/rmcomm_QD_RM_Protocol.h | 10 +-
src/backend/resourcemanager/include/errorcode.h | 1 +
.../resourcemanager/include/resourcepool.h | 7 +
.../resourcemanager/include/resqueuemanager.h | 46 +-
src/backend/resourcemanager/include/rmcommon.h | 8 +-
src/backend/resourcemanager/requesthandler.c | 76 +-
.../resourcemanager/requesthandler_ddl.c | 350 ++++----
.../resourcebroker_LIBYARN_proc.c | 120 ++-
src/backend/resourcemanager/resourcemanager.c | 400 +++++----
.../resourcemanager/resourcemanager_RMSEG.c | 6 +-
src/backend/resourcemanager/resourcepool.c | 128 ++-
src/backend/resourcemanager/resqueuecommand.c | 181 ++--
src/backend/resourcemanager/resqueuemanager.c | 877 +++++++++++--------
.../resourcemanager/utils/network_utils.c | 3 -
src/backend/tcop/utility.c | 8 -
src/backend/utils/misc/etc/slaves.exclude | 1 -
src/backend/utils/misc/etc/yarn-client.xml | 288 ++++++
src/backend/utils/misc/guc.c | 155 ++--
src/bin/pg_dump/pg_dumpall.c | 2 +-
src/include/catalog/indexing.h | 32 +-
src/include/catalog/pg_resqueue.h | 389 +++-----
src/include/catalog/pg_type.h | 12 +-
src/include/catalog/tidycat.pl | 3 -
src/include/catalog/toasting.h | 2 +-
src/include/cdb/cdbvars.h | 22 +-
.../data/upgrade20/upg2_catupgrade_20.sql.in | 2 +-
.../regress/expected/parquet_compression.out | 4 +-
src/test/regress/sql/parquet_compression.sql | 4 +-
tools/bin/generate-greenplum-path.sh | 11 +-
tools/bin/gppylib/data/2.0.json | 326 ++-----
49 files changed, 2254 insertions(+), 1988 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/Makefile.global.in
----------------------------------------------------------------------
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index ad920f8..a619d74 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -533,7 +533,7 @@ ifeq ($(BLD_ARCH),osx106_x86)
LIBS := $(LIBS) -framework CoreServices -framework IOKit
endif
-LIBS := $(LIBS) -lyarn
+LIBS := $(LIBS) -lyarn -lkrb5
##########################################################################
#
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/Makefile b/src/backend/Makefile
index f648f3c..44f13a2 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -222,8 +222,8 @@ endif
${INSTALL_SCRIPT} -d ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/hawq-site.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/hdfs-client.xml ${sysconfdir}
+ ${INSTALL_DATA} $(srcdir)/utils/misc/etc/yarn-client.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/slaves ${sysconfdir}
- ${INSTALL_DATA} $(srcdir)/utils/misc/etc/slaves.exclude ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/template-hawq-site.xml ${sysconfdir}
${INSTALL_DATA} $(srcdir)/utils/misc/etc/gpcheck.cnf ${sysconfdir}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/bootstrap/bootparse.y
----------------------------------------------------------------------
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 1c7bbfd..9680912 100755
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -370,23 +370,11 @@ Boot_CreateStmt:
typid = PG_REMOTE_CREDENTIALS_RELTYPE_OID;
break;
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20150917 */
case ResQueueRelationId:
typid = PG_RESQUEUE_RELTYPE_OID;
break;
-
-/* relation id: 6059 - pg_resourcetype 20140807 */
- case ResourceTypeRelationId:
- typid = PG_RESOURCETYPE_RELTYPE_OID;
- break;
-
-
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
- case ResQueueCapabilityRelationId:
- typid = PG_RESQUEUECAPABILITY_RELTYPE_OID;
- break;
-
/* TIDYCAT_END_CODEGEN */
default:
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/caql/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/catalog/caql/Makefile b/src/backend/catalog/caql/Makefile
index 9e0cd7f..de513a3 100644
--- a/src/backend/catalog/caql/Makefile
+++ b/src/backend/catalog/caql/Makefile
@@ -62,7 +62,6 @@ CAQL_CQL_SRCS := $(addprefix $(top_srcdir)/src/backend/,\
commands/opclasscmds.c \
commands/operatorcmds.c \
commands/proclang.c \
- commands/queue.c \
commands/schemacmds.c \
commands/sequence.c \
commands/tablecmds.c \
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/catalog.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 20c80f4..ba65d88 100755
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -793,15 +793,11 @@ relationId == GpVerificationHistoryRelationId ||
relationId == AuthTimeConstraintRelationId ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == FileSystemRelationId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueRelationId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeRelationId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityRelationId ||
-
/* relation id: 5036 - gp_segment_configuration 20150204 */
relationId == GpSegmentConfigRelationId ||
+
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueRelationId ||
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
)
@@ -859,28 +855,16 @@ relationId == GpVerificationHistoryVertokenIndexId ||
relationId == FileSystemOidIndexId ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == FileSystemFsysnameIndexId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueOidIndexId ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == ResQueueRsqnameIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeOidIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeRestypidIndexId ||
-/* relation id: 6059 - pg_resourcetype 20140807 */
-relationId == ResourceTypeResnameIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityOidIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityResqueueidIndexId ||
-/* relation id: 6060 - pg_resqueuecapability 20140807 */
-relationId == ResQueueCapabilityRestypidIndexId ||
-
/* relation id: 5036 - gp_segment_configuration 20150207 */
relationId == GpSegmentConfigRegistration_orderIndexId ||
/* relation id: 5036 - gp_segment_configuration 20150207 */
relationId == GpSegmentConfigRoleIndexId ||
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueOidIndexId ||
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == ResQueueRsqnameIndexId ||
+
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
@@ -902,13 +886,13 @@ relationId == PgFileSpaceEntryToastIndex ||
/* relation id: 6112 - pg_filesystem 20130123 */
relationId == PgFileSystemToastTable ||
relationId == PgFileSystemToastIndex ||
-/* relation id: 6026 - pg_resqueue 20140807 */
-relationId == PgResQueueToastTable ||
-relationId == PgResQueueToastIndex ||
-
/* relation id: 5036 - gp_segment_configuration 20150204 */
relationId == GpSegmentConfigToastTable ||
relationId == GpSegmentConfigToastIndex ||
+
+/* relation id: 6026 - pg_resqueue 20151014 */
+relationId == PgResQueueToastTable ||
+relationId == PgResQueueToastIndex ||
/* TIDYCAT_END_CODEGEN */
0 /* OR ZERO */
)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/catalog/toasting.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 56bcc30..24b0846 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -162,8 +162,7 @@ BootstrapToastTable(char *relName, Oid toastOid, Oid toastIndexOid)
typid = PG_REMOTE_CREDENTIALS_TOAST_RELTYPE_OID;
break;
-
-/* relation id: 6026 - pg_resqueue 20140807 */
+/* relation id: 6026 - pg_resqueue 20140917 */
case PgResQueueToastTable:
typid = PG_RESQUEUE_TOAST_RELTYPE_OID;
break;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index c46257a..b894629 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -3715,6 +3715,14 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query
}
/*allocate hash relation as a random relation*/
else{
+ MemoryContextSwitchTo(context->old_memorycontext);
+ CurrentRelType* relType = (CurrentRelType *) palloc(
+ sizeof(CurrentRelType));
+ relType->relid = rel_data->relid;
+ relType->isHash = false;
+ result->relsType = lappend(result->relsType, relType);
+ MemoryContextSwitchTo(context->datalocality_memorycontext);
+
result->forbid_optimizer = true;
allocate_random_relation(rel_data, &log_context,&idMap, &assignment_context, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index ee6ec26..d6a784d 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -291,36 +291,41 @@ bool debug_fake_segmentnum;
/* New HAWQ 2.0 basic GUCs. Some of these are duplicate variables, they are
* reserved to facilitate showing settings in hawq-site.xml. */
-char *master_addr_host; /* hawq.master.address.host */
-int master_addr_port; /* hawq.master.address.port */
-char *standby_addr_host; /* hawq.standby.address.host */
-int seg_addr_port; /* hawq.segment.address.port */
-char *dfs_url; /* hawq.dfs.url */
-char *master_directory; /* hawq.master.directory */
-char *seg_directory; /* hawq.segment.directory */
-bool rm_domain_comm_enable; /* enable domain socket for RM communication */
+char *master_addr_host;
+int master_addr_port;
+char *standby_addr_host;
+int seg_addr_port;
+char *dfs_url;
+char *master_directory;
+char *seg_directory;
/* HAWQ 2.0 resource manager GUCs */
-char *rm_seg_memory_use; /* hawq.resourcemanager.segment.limit.memory.use */
-double rm_seg_core_use; /* hawq.resourcemanager.segment.limit.core.use */
-int rm_master_addr_port; /* hawq.resourcemanager.master.address.port */
-int rm_master_addr_domain_port;
-int rm_seg_addr_port;
-
-char *rm_grm_server_type; /* hawq.resourcemanager.server.type */
-char *rm_grm_yarn_rm_addr; /* hawq.resourcemanager.yarn.resourcemanager.address*/
-char *rm_grm_yarn_sched_addr; /* hawq.resourcemanager.yarn.resourcemanager.scheduler.address*/
-char *rm_grm_yarn_queue; /* hawq.resourcemanager.yarn.queue */
-char *rm_grm_yarn_app_name; /* hawq.resourcemanager.yarn.application.name*/
+int rm_master_port;
+int rm_segment_port;
+int rm_master_domain_port;
+
+int rm_nvseg_perquery_limit;
+int rm_nvseg_perquery_perseg_limit;
+int rm_nslice_perseg_limit;
+
+char *rm_seg_memory_use;
+double rm_seg_core_use;
+
+
+
+char *rm_global_rm_type;
+char *rm_grm_yarn_rm_addr;
+char *rm_grm_yarn_sched_addr;
+char *rm_grm_yarn_queue;
+char *rm_grm_yarn_app_name;
int rm_grm_breath_return_percentage;
-int rm_query_vseg_num_limit;
-int rm_query_vseg_num_per_seg_limit;
-int rm_slice_num_per_seg_limit;
+
int rm_seg_container_default_waterlevel;
bool rm_force_fifo_queue;
-int rm_resource_noaction_timeout; /* How many seconds to wait before expiring
+bool rm_session_lease_heartbeat_enable;
+int rm_session_lease_timeout; /* How many seconds to wait before expiring
allocated resource. */
int rm_query_resource_noresource_timeout; /* How may seconds to wait before
expiring queuing query resource
@@ -330,6 +335,9 @@ int rm_resource_timeout; /* How many seconds to wait before returning
int rm_resource_heartbeat_interval; /* How many seconds to wait before sending
another heart-beat to resource manager. */
+int rm_tolerate_nseg_limit;
+int rm_nvseg_variance_among_seg_limit;
+
char *rm_resourcepool_test_filename;
bool rm_enforce_cpu_enable; /* hawq_resourceenforcer_cpu_enable */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 09666a1..c59098d 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -17,7 +17,7 @@ OBJS = aggregatecmds.o alter.o analyze.o analyzeutils.o async.o cluster.o commen
conversioncmds.o copy.o \
dbcommands.o define.o explain.o extprotocolcmds.o filespace.o filesystemcmds.o foreigncmds.o functioncmds.o \
indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
- portalcmds.o prepare.o proclang.o queue.o \
+ portalcmds.o prepare.o proclang.o \
schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/queue.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/queue.c b/src/backend/commands/queue.c
deleted file mode 100644
index 0240581..0000000
--- a/src/backend/commands/queue.c
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * queue.c
- * Commands for manipulating resource queues.
- *
- * Copyright (c) 2006-2010, Greenplum inc.
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/genam.h"
-#include "access/heapam.h"
-#include "catalog/catquery.h"
-#include "catalog/dependency.h"
-#include "catalog/heap.h"
-#include "catalog/indexing.h"
-#include "catalog/pg_authid.h"
-#include "catalog/pg_resqueue.h"
-#include "nodes/makefuncs.h"
-#include "cdb/cdbvars.h"
-#include "cdb/cdbdisp.h"
-#include "commands/comment.h"
-#include "commands/defrem.h"
-#include "commands/queue.h"
-#include "libpq/crypt.h"
-#include "miscadmin.h"
-#include "utils/acl.h"
-#include "utils/builtins.h"
-#include "utils/flatfiles.h"
-#include "utils/fmgroids.h"
-#include "utils/formatting.h"
-#include "utils/guc.h"
-#include "utils/lsyscache.h"
-#include "executor/execdesc.h"
-#include "utils/resscheduler.h"
-#include "utils/syscache.h"
-#include "cdb/memquota.h"
-#include "utils/guc_tables.h"
-
-#define INVALID_RES_LIMIT_STRING "-1"
-
-/**
- * Establish a lower bound on what memory limit may be set on a queue.
- */
-#define MIN_RESOURCEQUEUE_MEMORY_LIMIT_KB (10 * 1024L)
-
-/* MPP-6923: */
-List *
-GetResqueueCapabilityEntry(Oid queueid)
-{
- List *elems = NIL;
- HeapTuple tuple;
- cqContext *pcqCtx;
- cqContext cqc;
- Relation rel;
- TupleDesc tupdesc;
-
- rel = heap_open(ResQueueCapabilityRelationId, AccessShareLock);
-
- tupdesc = RelationGetDescr(rel);
-
- pcqCtx = caql_beginscan(
- caql_addrel(cqclr(&cqc), rel),
- cql("SELECT * FROM pg_resqueuecapability"
- " WHERE resqueueid = :1 ",
- ObjectIdGetDatum(queueid)));
-
- while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
- {
- if (HeapTupleIsValid(tuple))
- {
- List *pentry = NIL;
- int resTypeInt = 0;
- text *resSet_text = NULL;
- Datum resSet_datum;
- char *resSetting = NULL;
- bool isnull = false;
-
- resTypeInt =
- ((Form_pg_resqueuecapability) GETSTRUCT(tuple))->restypid;
-
- resSet_datum = heap_getattr(tuple,
- Anum_pg_resqueuecapability_ressetting,
- tupdesc,
- &isnull);
- Assert(!isnull);
- resSet_text = DatumGetTextP(resSet_datum);
- resSetting = DatumGetCString(DirectFunctionCall1(textout,
- PointerGetDatum(resSet_text)));
-
- pentry = list_make2(
- makeInteger(resTypeInt),
- makeString(resSetting));
-
- /* list of lists */
- elems = lappend(elems, pentry);
- }
- }
- caql_endscan(pcqCtx);
-
- heap_close(rel, AccessShareLock);
-
- return (elems);
-} /* end GetResqueueCapabilityEntry */
-
-/**
- * Given a queue id, return its name
- */
-char *GetResqueueName(Oid resqueueOid)
-{
- int fetchCount;
- char *result = NULL;
-
- result = caql_getcstring_plus(
- NULL,
- &fetchCount,
- NULL,
- cql("SELECT rsqname FROM pg_resqueue"
- " WHERE oid = :1 ",
- ObjectIdGetDatum(resqueueOid)));
-
- /* If we cannot find a resource queue id for any reason */
- if (!fetchCount)
- result = pstrdup("Unknown");
-
- return result;
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/commands/user.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index 32b7417..a673b75 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -146,7 +146,7 @@ static bool resourceQueueIsBranch(Oid queueid)
Assert(tuple != NULL);
- status = heap_getattr(tuple, Anum_pg_resqueue_rsq_status, RelationGetDescr(pg_resqueue_rel), &isNull);
+ status = heap_getattr(tuple, Anum_pg_resqueue_status, RelationGetDescr(pg_resqueue_rel), &isNull);
if (!isNull && strncmp(TextDatumGetCString(status), "branch", strlen("branch")) == 0)
res = true;
@@ -206,9 +206,8 @@ CreateRole(CreateRoleStmt *stmt)
cqContext *pcqCtx;
Oid queueid = InvalidOid;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
/* The defaults can vary depending on the original statement type */
switch (stmt->stmt_type)
@@ -449,22 +448,10 @@ CreateRole(CreateRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
-
- if (res != FUNC_RETURN_OK) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply CREATE ROLE. "
- "Because fail to communicate with resource manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying CREATE ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/*
@@ -554,31 +541,34 @@ CreateRole(CreateRoleStmt *stmt)
if (resqueue)
{
- if (strcmp(resqueue, "none") == 0) {
+ if (strcmp(resqueue, "none") == 0)
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
- errmsg("resource queue name \"%s\" is reserved",
+ errmsg("Resource queue name \"%s\" is reserved",
resqueue), errOmitLocation(true)));
}
queueid = GetResQueueIdForName(resqueue);
- if (queueid == InvalidOid) {
+ if (queueid == InvalidOid)
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("Resource queue \"%s\" does not exist",
resqueue), errOmitLocation(true)));
}
- if(resourceQueueIsBranch(queueid)) {
+ if(resourceQueueIsBranch(queueid))
+ {
unregisterConnectionInRMWithErrorReport(resourceid);
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("can't assign a branch resource queue \"%s\" to role",
+ errmsg("Can't assign a branch resource queue \"%s\" to role",
resqueue), errOmitLocation(true)));
}
@@ -614,7 +604,6 @@ CreateRole(CreateRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_CREATE,
issuper,
stmt->role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
}
@@ -627,19 +616,13 @@ CreateRole(CreateRoleStmt *stmt)
if (resqueue && queueid != InvalidOid)
{
- if (res != FUNC_RETURN_OK) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply CREATE ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply CREATE ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply CREATE ROLE because of %s", errorbuf)));
}
}
@@ -774,11 +757,10 @@ AlterRole(AlterRoleStmt *stmt)
cqContext cqc;
cqContext *pcqCtx;
- Oid queueid = InvalidOid;
+ Oid queueid = InvalidOid;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
numopts = list_length(stmt->options);
@@ -977,7 +959,8 @@ AlterRole(AlterRoleStmt *stmt)
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -989,21 +972,10 @@ AlterRole(AlterRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
- if (res != FUNC_RETURN_OK) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply ALTER ROLE. "
- "Because fail to communicate with resource manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying ALTER ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/*
@@ -1278,7 +1250,6 @@ AlterRole(AlterRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_ALTER,
issuper,
stmt->role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
}
@@ -1291,19 +1262,14 @@ AlterRole(AlterRoleStmt *stmt)
if (resqueue && queueid != InvalidOid)
{
- if ( res != FUNC_RETURN_OK ) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply ALTER ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
+
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply ALTER ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply ALTER ROLE because of %s", errorbuf)));
}
}
@@ -1541,9 +1507,8 @@ DropRole(DropRoleStmt *stmt)
pg_auth_members_rel;
ListCell *item;
- int res = FUNC_RETURN_OK;
- int errorcode = FUNC_RETURN_OK;
- char errorbuf[1024];
+ int res = FUNC_RETURN_OK;
+ static char errorbuf[1024] = "";
if (!have_createrole_privilege())
ereport(ERROR,
@@ -1662,22 +1627,10 @@ DropRole(DropRoleStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- errorcode = res;
- if ( res != FUNC_RETURN_OK ) {
- releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply DROP ROLE. "
- "Because fail to communicate with resource manager.")));
- }
-
- if ( errorcode != FUNC_RETURN_OK ) {
+ if (res != FUNC_RETURN_OK)
+ {
releaseResourceContextWithErrorReport(resourceid);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not register connection in resource manager "
- "for applying DROP ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
/* Notify resource manager to drop role. */
@@ -1687,7 +1640,6 @@ DropRole(DropRoleStmt *stmt)
MANIPULATE_ROLE_RESQUEUE_DROP,
0, // not used when drop role
(char*)role,
- &errorcode,
errorbuf,
sizeof(errorbuf));
/* We always unregister connection. */
@@ -1696,19 +1648,13 @@ DropRole(DropRoleStmt *stmt)
/* We always release resource context. */
releaseResourceContextWithErrorReport(resourceid);
- if ( res != FUNC_RETURN_OK ) {
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Can not apply DROP ROLE. "
- "Because fail to communicate with resource "
- "manager.")));
- }
- if (errorcode != FUNC_RETURN_OK) {
- /* TODO: Check if we should use another error code. */
+ if ( res != FUNC_RETURN_OK )
+ {
ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("Can not apply DROP ROLE. Because [%d]%s",
- errorcode, errorbuf)));
+ (errcode(IS_TO_RM_RPC_ERROR(res) ?
+ ERRCODE_INTERNAL_ERROR :
+ ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("Can not apply DROP ROLE because of %s", errorbuf)));
}
/*
@@ -2145,16 +2091,7 @@ AddRoleMems(const char *rolename, Oid roleid,
rolename)));
}
- /*
- * The role membership grantor of record has little significance at
- * present. Nonetheless, inasmuch as users might look to it for a crude
- * audit trail, let only superusers impute the grant to a third party.
- *
- * Before lifting this restriction, give the member == role case of
- * is_admin_of_role() a fresh look. Ensure that the current role cannot
- * use an explicit grantor specification to take advantage of the session
- * user's self-admin right.
- */
+ /* XXX not sure about this check */
if (grantorId != GetUserId() && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/optimizer/plan/planner.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index e0162a9..ae81adf 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -328,6 +328,8 @@ planner(Query *parse, int cursorOptions,
}
}
+ int optimizer_segments_saved_value = optimizer_segments;
+
PG_TRY();
{
if (resourceNegotiateDone)
@@ -347,6 +349,11 @@ planner(Query *parse, int cursorOptions,
}
START_MEMORY_ACCOUNT(MemoryAccounting_CreateAccount(0, MEMORY_OWNER_TYPE_Optimizer));
{
+ if (optimizer_segments == 0) // value not set by user
+ {
+ optimizer_segments = gp_segments_for_planner;
+ }
+
result = optimize_query(parse, boundParams);
if (ppResult->stmt && ppResult->stmt->intoPolicy
&& result && result->intoPolicy)
@@ -354,6 +361,7 @@ planner(Query *parse, int cursorOptions,
result->intoPolicy->bucketnum =
ppResult->stmt->intoPolicy->bucketnum;
}
+ optimizer_segments = optimizer_segments_saved_value;
}
END_MEMORY_ACCOUNT();
@@ -405,6 +413,7 @@ planner(Query *parse, int cursorOptions,
*/
resourceNegotiateDone = false;
plannerLevel = 0;
+ optimizer_segments = optimizer_segments_saved_value;
if (savedQueryResource)
{
gp_segments_for_planner = list_length(savedQueryResource->segments);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/postmaster/identity.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/identity.c b/src/backend/postmaster/identity.c
index 737c31a..7d1bcf0 100644
--- a/src/backend/postmaster/identity.c
+++ b/src/backend/postmaster/identity.c
@@ -376,7 +376,7 @@ SetupProcessIdentity(const char *str)
/* If QE is under one segment. */
if ( GetQEIndex() != -1 ) {
GetLocalTmpDirFromRM("127.0.0.1",//DRMGlobalInstance->SocketLocalHostName.Str,
- rm_seg_addr_port,
+ rm_segment_port,
gp_session_id,
gp_command_count,
GetQEIndex());
@@ -384,7 +384,7 @@ SetupProcessIdentity(const char *str)
/* QE is under master. */
else {
GetLocalTmpDirFromRM("127.0.0.1",//DRMGlobalInstance->SocketLocalHostName.Str,
- rm_master_addr_port,
+ rm_master_port,
gp_session_id,
gp_command_count,
GetQEIndex());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index e8583ce..10189e0 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -210,7 +210,7 @@ void initializeQD2RMComm(void)
}
/* Get UNIX domain socket file. */
- UNIXSOCK_PATH(QD2RM_SocketFile, rm_master_addr_domain_port, UnixSocketDir);
+ UNIXSOCK_PATH(QD2RM_SocketFile, rm_master_domain_port, UnixSocketDir);
/* Initialize global variables for maintaining a list of resource sets. */
QD2RM_ResourceSets = rm_palloc0(QD2RM_CommContext,
@@ -232,13 +232,16 @@ void initializeQD2RMComm(void)
}
/* Start resource heart-beat thread. */
- if ( pthread_create(&ResourceHeartBeatThreadHandle,
- NULL,
- generateResourceRefreshHeartBeat,
- NULL) != 0)
+ if ( rm_session_lease_heartbeat_enable )
{
- elog(ERROR, "Fail to create background thread for communication with "
- "resource manager.");
+ if ( pthread_create(&ResourceHeartBeatThreadHandle,
+ NULL,
+ generateResourceRefreshHeartBeat,
+ NULL) != 0)
+ {
+ elog(ERROR, "Fail to create background thread for communication with "
+ "resource manager.");
+ }
}
initializeMessageHandlers();
@@ -530,6 +533,7 @@ int unregisterConnectionInRM(int index,
(RPCResponseHeadUnregisterConnectionInRM)(recvbuffer->Buffer);
if ( response->Result != FUNC_RETURN_OK )
{
+ res = response->Result;
snprintf(errorbuf, errorbufsize,
"Fail to unregister in HAWQ resource manager because of "
"remote error %s.",
@@ -540,7 +544,7 @@ int unregisterConnectionInRM(int index,
QD2RM_ResourceSets[index]->QD_Conn_ID);
QD2RM_ResourceSets[index]->QD_Conn_ID = INVALID_CONNID;
- return FUNC_RETURN_OK;
+ return res;
}
void unregisterConnectionInRMWithErrorReport(int index)
@@ -598,8 +602,8 @@ int acquireResourceFromRM(int index,
requesthead.MaxSegCountFix = max_seg_count_fix;
requesthead.MinSegCountFix = min_seg_count_fix;
requesthead.SliceSize = slice_size;
- requesthead.VSegLimitPerSeg = rm_query_vseg_num_per_seg_limit;
- requesthead.VSegLimit = rm_query_vseg_num_limit;
+ requesthead.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
+ requesthead.VSegLimit = rm_nvseg_perquery_limit;
requesthead.Reserved = 0;
requesthead.IOBytes = iobytes;
@@ -781,7 +785,9 @@ int returnResource(int index,
/* Parse response. */
RPCResponseHeadReturnResource response =
(RPCResponseHeadReturnResource)(recvbuffer->Buffer);
- if ( response->Result != FUNC_RETURN_OK ) {
+ if ( response->Result != FUNC_RETURN_OK )
+ {
+ res = response->Result;
snprintf(errorbuf, errorbufsize,
"Fail to return resource to HAWQ resource manager because of "
"remote error %s.",
@@ -845,9 +851,7 @@ int manipulateResourceQueue(int index,
recvbuffer);
if ( res != FUNC_RETURN_OK )
{
- snprintf(errorbuf, errorbufsize,
- "Fail to manipulate resource queue because of RPC error %s.",
- getErrorCodeExplain(res));
+ snprintf(errorbuf, errorbufsize, "%s", getErrorCodeExplain(res));
return res;
}
@@ -856,27 +860,29 @@ int manipulateResourceQueue(int index,
(RPCResponseHeadManipulateResQueue)(recvbuffer->Buffer);
/* CASE 1. The response contains error message. */
- if ( response->Result != FUNC_RETURN_OK ) {
-
+ if ( response->Result != FUNC_RETURN_OK )
+ {
RPCResponseHeadManipulateResQueueERROR error =
(RPCResponseHeadManipulateResQueueERROR)(recvbuffer->Buffer);
- elog(LOG, "Fail to manipulate resource queue because %s",
- error->ErrorText);
+ elog(WARNING, "Fail to manipulate resource queue because %s",
+ error->ErrorText);
snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
}
+
+ elog(DEBUG3, "Manipulated resource queue and got result %d", response->Result);
+
return response->Result;
}
-int manipulateRoleForResourceQueue (int index,
- Oid roleid,
- Oid queueid,
- uint16_t action,
- uint8_t isSuperUser,
- char *rolename,
- int *errorcode,
- char *errorbuf,
- int errorbufsize)
+int manipulateRoleForResourceQueue (int index,
+ Oid roleid,
+ Oid queueid,
+ uint16_t action,
+ uint8_t isSuperUser,
+ char *rolename,
+ char *errorbuf,
+ int errorbufsize)
{
initializeQD2RMComm();
@@ -902,65 +908,51 @@ int manipulateRoleForResourceQueue (int index,
request.isSuperUser = isSuperUser;
request.Action = action;
if (strlen(rolename) < sizeof(request.Name))
+ {
strncpy(request.Name, rolename, strlen(rolename));
- else {
- snprintf(errorbuf, errorbufsize, "Invalid role name.");
- *errorcode = res;
- return res;
+ }
+ else
+ {
+ elog(WARNING, "Resource manager finds in valid role name %s.", rolename);
+ snprintf(errorbuf, errorbufsize, "invalid role name %s.", rolename);
+ return RESQUEMGR_NO_USERID;
}
- elog(RMLOG, "HAWQ RM: manipulateRoleForResourceQueue "
- "role oid:%d, queueID:%d, isSuper:%d, roleName:%s, action:%d",
- request.RoleOID, request.QueueOID, request.isSuperUser,
- request.Name, request.Action);
+ elog(DEBUG3, "Resource manager (manipulateRoleForResourceQueue) "
+ "role oid:%d, queueID:%d, isSuper:%d, roleName:%s, action:%d",
+ request.RoleOID, request.QueueOID, request.isSuperUser,
+ request.Name, request.Action);
appendSMBVar(sendbuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendbuffer->Buffer,
- sendbuffer->Cursor + 1,
- REQUEST_QD_DDL_MANIPULATEROLE,
- RESPONSE_QD_DDL_MANIPULATEROLE,
- recvbuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendbuffer->Buffer,
- sendbuffer->Cursor + 1,
- REQUEST_QD_DDL_MANIPULATEROLE,
- RESPONSE_QD_DDL_MANIPULATEROLE,
- recvbuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
- snprintf(errorbuf, errorbufsize,
- "Fail to get response from resource manager RPC.");
- *errorcode = res;
+ res = callSyncRPCToRM(sendbuffer->Buffer,
+ sendbuffer->Cursor + 1,
+ REQUEST_QD_DDL_MANIPULATEROLE,
+ RESPONSE_QD_DDL_MANIPULATEROLE,
+ recvbuffer);
+
+ if ( res != FUNC_RETURN_OK )
+ {
+ snprintf(errorbuf, errorbufsize, "%s", getErrorCodeExplain(res));
return res;
}
/* Start parsing response. */
- RPCResponseHeadManipulateRole response =
- (RPCResponseHeadManipulateRole)(recvbuffer->Buffer);
+ RPCResponseHeadManipulateRole response = (RPCResponseHeadManipulateRole)
+ (recvbuffer->Buffer);
/* The response contains error message. */
- if ( response->Result != FUNC_RETURN_OK ) {
-
+ if ( response->Result != FUNC_RETURN_OK )
+ {
RPCResponseHeadManipulateRoleERROR error =
(RPCResponseHeadManipulateRoleERROR)(recvbuffer->Buffer);
- elog(RMLOG, "HAWQ RM :: Fail to manipulate role. %s",
- error->ErrorText);
+ elog(WARNING, "Resource manager failed to manipulate role %s. %s",
+ rolename,
+ error->ErrorText);
snprintf(errorbuf, errorbufsize, "%s", error->ErrorText);
- *errorcode = error->Result;
- return FUNC_RETURN_OK;
}
-
- *errorcode = FUNC_RETURN_OK;
- return res;
+ return response->Result;
}
void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer,
@@ -972,8 +964,7 @@ void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer,
Assert( sendbuffer != NULL );
Assert( connid != 0XFFFFFFFF );
Assert( queuename != NULL );
- Assert( action >= MANIPULATE_RESQUEUE_CREATE &&
- action <= MANIPULATE_RESQUEUE_DROP );
+ Assert( action >= MANIPULATE_RESQUEUE_CREATE && action <= MANIPULATE_RESQUEUE_DROP );
uint16_t withlength = 0;
bool nowIsWithOption = false;
@@ -1069,26 +1060,14 @@ void sendFailedNodeToResourceManager(int hostNum, char **pghost) {
elog(LOG, "HAWQ RM :: QD sends %d failed host(s) to resource manager.",
hostNum);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_SEGMENT_ISDOWN,
- RESPONSE_QD_SEGMENT_ISDOWN,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_SEGMENT_ISDOWN,
- RESPONSE_QD_SEGMENT_ISDOWN,
- &recvBuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_SEGMENT_ISDOWN,
+ RESPONSE_QD_SEGMENT_ISDOWN,
+ &recvBuffer);
+
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Fail to get response from resource manager RPC. %d", res);
goto exit;
}
@@ -1114,25 +1093,11 @@ int getLocalTmpDirFromMasterRM()
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_TMPDIR,
- RESPONSE_QD_TMPDIR,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_TMPDIR,
- RESPONSE_QD_TMPDIR,
- &recvBuffer);
- }
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_TMPDIR,
+ RESPONSE_QD_TMPDIR,
+ &recvBuffer);
if ( res != FUNC_RETURN_OK )
{
elog(ERROR, "getLocalTmpDirFromMasterRM fail");
@@ -1185,35 +1150,23 @@ int acquireResourceQuotaFromRM(int64_t user_oid,
request.UseridOid = user_oid;
request.MaxSegCountFix = max_seg_count_fix;
request.MinSegCountFix = min_seg_count_fix;
- request.VSegLimitPerSeg = rm_query_vseg_num_per_seg_limit;
- request.VSegLimit = rm_query_vseg_num_limit;
+ request.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit;
+ request.VSegLimit = rm_nvseg_perquery_limit;
appendSMBVar(&sendBuffer, request);
- elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, %d preferred virtual segments by user "INT64_FORMAT,
+ elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, "
+ "%d preferred virtual segments by user "INT64_FORMAT,
max_seg_count_fix,
min_seg_count_fix,
user_oid);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
- RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
- RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
- &recvBuffer);
- }
- if ( res != FUNC_RETURN_OK ) {
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_ACQUIRE_RESOURCE_QUOTA,
+ RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
+ {
snprintf(errorbuf, errorbufsize,
"Fail to get response from resource manager RPC.");
*errorcode = res;
@@ -1245,7 +1198,6 @@ exit:
void *generateResourceRefreshHeartBeat(void *arg)
{
- static char dfilename[256];
static char messagehead[16] = {'M' ,'S' ,'G' ,'S' ,'T' ,'A' ,'R' ,'T' ,
'\0','\0','\0','\0','\0','\0','\0','\0'};
static char messagetail[8] = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' };
@@ -1292,11 +1244,12 @@ void *generateResourceRefreshHeartBeat(void *arg)
/* Build final request content and send out. */
appendSelfMaintainBufferTill64bitAligned(&contbuffer);
- if ( sendcontent ) {
+ if ( sendcontent )
+ {
int fd = -1;
- int res = connectToServerDomain(QD2RM_SocketFile, 0, &fd, 1, dfilename);
- if ( res == FUNC_RETURN_OK ) {
-
+ int res = connectToServerRemote(master_addr_host, rm_master_port, &fd);
+ if ( res == FUNC_RETURN_OK )
+ {
RMMessageHead phead = (RMMessageHead)messagehead;
RMMessageTail ptail = (RMMessageTail)messagetail;
phead->Mark1 = 0;
@@ -1322,7 +1275,7 @@ void *generateResourceRefreshHeartBeat(void *arg)
write_log("generateResourceRefreshHeartBeat send error (errno %d)", errno);
}
}
- closeConnectionDomain(&fd, dfilename);
+ closeConnectionRemote(&fd);
}
pg_usleep(rm_resource_heartbeat_interval * 1000000);
}
@@ -1362,24 +1315,17 @@ Datum pg_resqueue_status(PG_FUNCTION_ARGS)
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_DUMP_RESQUEUE_STATUS,
+ RESPONSE_QD_DUMP_RESQUEUE_STATUS,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
{
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_RESQUEUE_STATUS,
- RESPONSE_QD_DUMP_RESQUEUE_STATUS,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_RESQUEUE_STATUS,
- RESPONSE_QD_DUMP_RESQUEUE_STATUS,
- &recvBuffer);
+ destroySelfMaintainBuffer(&sendBuffer);
+ destroySelfMaintainBuffer(&recvBuffer);
+ funcctx->max_calls = 0;
+ SRF_RETURN_DONE(funcctx);
}
RPCResponseResQueueStatus response = (RPCResponseResQueueStatus)(recvBuffer.Buffer);
@@ -1507,25 +1453,15 @@ void dumpResourceManagerStatus(uint32_t type, const char *dump_file)
strncpy(request.dump_file, dump_file, sizeof(request.dump_file) - 1);
appendSMBVar(&sendBuffer, request);
- if (rm_domain_comm_enable)
- {
- res = callSyncRPCDomain(QD2RM_SocketFile,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_STATUS,
- RESPONSE_QD_DUMP_STATUS,
- &recvBuffer);
- }
- else
- {
- res = callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
- sendBuffer.Buffer,
- sendBuffer.Cursor + 1,
- REQUEST_QD_DUMP_STATUS,
- RESPONSE_QD_DUMP_STATUS,
- &recvBuffer);
- }
+ res = callSyncRPCToRM(sendBuffer.Buffer,
+ sendBuffer.Cursor + 1,
+ REQUEST_QD_DUMP_STATUS,
+ RESPONSE_QD_DUMP_STATUS,
+ &recvBuffer);
+ if ( res != FUNC_RETURN_OK )
+ {
+ goto exit;
+ }
RPCResponseDumpStatus response = (RPCResponseDumpStatus)(recvBuffer.Buffer);
@@ -2494,23 +2430,20 @@ int callSyncRPCToRM(const char *sendbuff,
uint16_t exprecvmsgid,
SelfMaintainBuffer recvsmb)
{
- if (rm_domain_comm_enable)
- {
+#ifdef ENABLE_DOMAINSERVER
return callSyncRPCDomain(QD2RM_SocketFile,
sendbuff,
sendbuffsize,
sendmsgid,
exprecvmsgid,
recvsmb);
- }
- else
- {
+#else
return callSyncRPCRemote(master_addr_host,
- rm_master_addr_port,
+ rm_master_port,
sendbuff,
sendbuffsize,
sendmsgid,
exprecvmsgid,
recvsmb);
- }
+#endif
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
index 9b0ae17..874b53d 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QE2RMSEG.c
@@ -28,7 +28,7 @@ MoveToCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
@@ -71,7 +71,7 @@ MoveOutCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
@@ -116,7 +116,7 @@ SetWeightCGroupForQE(TimestampTz masterStartTime,
int res = FUNC_RETURN_OK;
char *serverHost = "127.0.0.1";
- uint16_t serverPort = rm_seg_addr_port;
+ uint16_t serverPort = rm_segment_port;
SelfMaintainBuffer sendBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
SelfMaintainBuffer recvBuffer = createSelfMaintainBuffer(CurrentMemoryContext);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index d9d82ca..c87694a 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -142,7 +142,7 @@ int sendRUAlive(char *seghostname)
/* Connect to HAWQ RM server */
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -333,7 +333,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -343,7 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
elog(LOG, "Resource manager failed to set connection to segment host %s "
"on port %d to increase memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
processContainersAfterIncreaseMemoryQuota(containerset, false);
freeGRMContainerSet(newctns);
rm_pfree(AsyncCommContext, context);
@@ -354,7 +354,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
elog(DEBUG3, "Resource manager succeeded set connection to segment host %s "
"on port %d to increase memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
}
buildMessageToCommBuffer(commbuffer,
@@ -504,7 +504,7 @@ int decreaseMemoryQuota(char *seghostname,
res = registerAsyncConnectionFileDesc(NULL,
seghostname,
- rm_seg_addr_port,
+ rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
@@ -514,7 +514,7 @@ int decreaseMemoryQuota(char *seghostname,
elog(LOG, "Resource manager failed to set connection to segment host %s "
"on port %d to decrease memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
processContainersAfterDecreaseMemoryQuota(ctns, false);
freeGRMContainerSet(ctns);
rm_pfree(AsyncCommContext, context);
@@ -525,7 +525,7 @@ int decreaseMemoryQuota(char *seghostname,
elog(DEBUG3, "Resource manager succeeded set connection to segment host %s "
"on port %d to decrease memory quota.",
seghostname,
- rm_seg_addr_port);
+ rm_segment_port);
}
buildMessageToCommBuffer(commbuffer,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 9a35cce..4adeb44 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -129,7 +129,7 @@ int sendIMAlive(int *errorcode,
DRMGlobalInstance->SendToStandby?
standby_addr_host:
master_addr_host,
- rm_master_addr_port,
+ rm_master_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/errorcode.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/errorcode.c b/src/backend/resourcemanager/errorcode.c
index 81ec92b..49c1a1e 100644
--- a/src/backend/resourcemanager/errorcode.c
+++ b/src/backend/resourcemanager/errorcode.c
@@ -12,6 +12,8 @@ ErrorDetailData ErrorDetailsPreset[] = {
{RESQUEMGR_NOCLUSTER_TIMEOUT, "no available cluster to run query"},
{RESQUEMGR_NORESOURCE_TIMEOUT, "no available resource to run query"},
{RESQUEMGR_TOO_MANY_FIXED_SEGNUM, "expecting too many virtual segments"},
+ {REQUESTHANDLER_WRONG_CONNSTAT, "that resource context maybe recycled due to timeout"},
+ {CONNTRACK_NO_CONNID, "that resource context does not exist"},
{-1, ""}
};
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h b/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
index 524399a..bdb9400 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD2RM.h
@@ -136,15 +136,14 @@ int manipulateResourceQueue(int index,
char *errorbuf,
int errorbufsize);
-int manipulateRoleForResourceQueue (int index,
- Oid roleid,
- Oid queueid,
- uint16_t action,
- uint8_t isSuperUser,
- char *rolename,
- int *errorcode,
- char *errorbuf,
- int errorbufsize);
+int manipulateRoleForResourceQueue (int index,
+ Oid roleid,
+ Oid queueid,
+ uint16_t action,
+ uint8_t isSuperUser,
+ char *rolename,
+ char *errorbuf,
+ int errorbufsize);
void SendResourceRefreshHeartBeat(void);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
index 9d063d9..17d397b 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_QD_RM_Protocol.h
@@ -107,15 +107,13 @@ enum CatResQueueAction_Action_Enum {
};
RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseHeadManipulateResQueueERROR)
- uint32_t Result;
- uint32_t Reserved;
- char ErrorText[1];
+ RPCResponseHeadManipulateResQueueData Result;
+ char ErrorText[1];
RPC_PROTOCOL_STRUCT_END(RPCResponseHeadManipulateResQueueERROR)
RPC_PROTOCOL_STRUCT_BEGIN(RPCResponseHeadManipulateRoleERROR)
- uint32_t Result;
- uint32_t Reserved;
- char ErrorText[1];
+ RPCResponseHeadManipulateRoleData Result;
+ char ErrorText[1];
RPC_PROTOCOL_STRUCT_END(RPCResponseHeadManipulateRoleERROR)
/*******************************************************************************
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/errorcode.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/errorcode.h b/src/backend/resourcemanager/include/errorcode.h
index 5eb627d..8c86011 100644
--- a/src/backend/resourcemanager/include/errorcode.h
+++ b/src/backend/resourcemanager/include/errorcode.h
@@ -74,6 +74,7 @@ enum DRM_ERROR_CODE {
RESQUEMGR_DEADLOCK_DETECTED,
RESQUEMGR_NOCLUSTER_TIMEOUT,
RESQUEMGR_NORESOURCE_TIMEOUT,
+ RESQUEMGR_WRONG_RES_QUOTA_EXP,
REQUESTHANDLER_START_TAG = 200,
REQUESTHANDLER_WAIT_RESOURCE,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index c502429..66c334d 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -460,6 +460,9 @@ struct ResourcePoolData {
*/
ALLOC_RES_FROM_RESPOOL_FUNC allocateResFuncs[RESOURCEPOOL_MAX_ALLOC_POLICY_SIZE];
+ /* Slaves file content. */
+ int64_t SlavesFileTimestamp;
+ int SlavesHostCount;
};
typedef struct ResourcePoolData *ResourcePool;
@@ -601,6 +604,9 @@ typedef struct RB_GRMContainerStatData *RB_GRMContainerStat;
void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size);
int getClusterGRMContainerSize(void);
+
+void checkSlavesFile(void);
+
/*
*------------------------------------------------------------------------------
* gp_segment_configuration catalog operating APIs.
@@ -617,6 +623,7 @@ void add_segment_config_row(int32_t id,
char *address,
uint32_t port,
char role);
+
/*
* In resource pool, segment's id starts from 0, however in gp_segment_configuration table,
* segment registration order starts from 1(0 is for master, -1 is for standby).
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 0ba03f7..38de157 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -38,14 +38,17 @@ enum RESOURCE_QUEUE_DDL_ATTR_INDEX
RSQ_DDL_ATTR_ACTIVE_STATMENTS,
RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER,
RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER,
- RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA,
+ RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA,
RSQ_DDL_ATTR_ALLOCATION_POLICY,
- RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR,
- RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT,
+ RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR,
+ RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT,
+ RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT,
+ RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG,
+ RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG,
RSQ_DDL_ATTR_COUNT
};
-extern char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT][RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX];
+extern char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT][RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX+1];
/*
* The attributes for expressing one complete resource queue definition.
@@ -57,10 +60,13 @@ enum RESOURCE_QUEUE_TABLE_ATTR_INDEX {
RSQ_TBL_ATTR_ACTIVE_STATMENTS,
RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER,
RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER,
- RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA,
+ RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA,
RSQ_TBL_ATTR_ALLOCATION_POLICY,
- RSQ_TBL_ATTR_RESORUCE_UPPER_FACTOR,
- RSQ_TBL_ATTR_VSEGMENT_UPPER_LIMIT,
+ RSQ_TBL_ATTR_RESORUCE_OVERCOMMIT_FACTOR,
+ RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT,
+ RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT,
+ RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG,
+ RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG,
/* The attributes automatically generated. */
RSQ_TBL_ATTR_OID,
@@ -73,17 +79,6 @@ enum RESOURCE_QUEUE_TABLE_ATTR_INDEX {
};
/*
- * The possible resource allocation policies.
- */
-enum RESOURCE_QUEUE_ALLOCATION_POLICY_INDEX {
- RSQ_ALLOCATION_POLICY_EVEN = 0,
- RSQ_ALLOCATION_POLICY_FIFO,
-
- RSQ_ALLOCATION_POLICY_COUNT
-};
-
-
-/*
* The attributes for expressing one complete role/user definition.
*/
enum USER_ATTR_INDEX {
@@ -114,8 +109,11 @@ struct DynResourceQueueData {
int32_t SegResourceQuotaMemoryMB;/* Segment resource quota */
int32_t Reserved2;
- double ResourceUpperFactor;
- int32_t VSegUpperLimit; /* vseg upper limit. */
+ int32_t NVSegUpperLimit; /* vseg upper limit. */
+ int32_t NVSegLowerLimit; /* vseg upper limit. */
+ double NVSegUpperLimitPerSeg; /* vseg upper limit per seg. */
+ double NVSegLowerLimitPerSeg; /* vseg lower limit per seg. */
+ double ResourceOvercommit;
int8_t AllocatePolicy; /* Allocation policy */
int8_t QueuingPolicy; /* Temporary unused. */
@@ -334,6 +332,7 @@ int shallowparseResourceQueueWithAttributes(List *rawattr,
int parseResourceQueueAttributes( List *attributes,
DynResourceQueue queue,
+ bool checkformatonly,
char *errorbuf,
int errorbufsize);
@@ -367,8 +366,7 @@ void setQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack);
void removeQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack);
-DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid,
- bool *exist);
+DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid, bool *exist);
DynResourceQueueTrack getQueueTrackByQueueName(char *quename,
int quenamelen,
@@ -382,7 +380,7 @@ int parseUserAttributes( List *attributes,
int checkUserAttributes(UserInfo user, char *errorbuf, int errorbufsize);
-int createUser(UserInfo userinfo, char *errorbuf, int errorbufsize);
+void createUser(UserInfo userinfo);
void setUserIndexedByUserOID(UserInfo userinfo);
void setUserIndexedByUserName(UserInfo userinfo);
@@ -420,7 +418,7 @@ const char *getUSRTBLAttributeName(int attrindex);
int registerConnectionByUserID(ConnectionTrack conntrack);
int acquireResourceFromResQueMgr(ConnectionTrack conntrack);
int returnResourceToResQueMgr(ConnectionTrack conntrack);
-void returnConnectionToQueue(ConnectionTrack conntrack, bool normally);
+void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout);
int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack);
void cancelResourceAllocRequest(ConnectionTrack conntrack);
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/include/rmcommon.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/rmcommon.h b/src/backend/resourcemanager/include/rmcommon.h
index 246454b..6150e89 100644
--- a/src/backend/resourcemanager/include/rmcommon.h
+++ b/src/backend/resourcemanager/include/rmcommon.h
@@ -1,15 +1,8 @@
#ifndef HAWQ_RESOURCEMANAGER_COMMON_DEFINITIONS
#define HAWQ_RESOURCEMANAGER_COMMON_DEFINITIONS
-#define RESOURCE_QUEUE_DEFAULT_QUEUE_NAME "pg_default"
-#define RESOURCE_QUEUE_ROOT_QUEUE_NAME "pg_root"
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_MEM "mem:"
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_CORE "core:"
#define RESOURCE_QUEUE_RATIO_SIZE 32
#define USER_DEFAULT_PRIORITY 3
-#define RESOURCE_QUEUE_PARALLEL_COUNT_DEF 100
-#define RESOURCE_QUEUE_SEG_RES_QUOTA_DEF 128
-#define RESOURCE_QUEUE_RES_UPPER_FACTOR_DEF 2
/* The queue is a valid leaf queue. */
#define RESOURCE_QUEUE_STATUS_VALID_LEAF 0x00000001
@@ -30,6 +23,7 @@
#define RESOURCE_QUEUE_STATUS_IS_VER1X 0x20000000
#define RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX 64
+#define RESOURCE_QUEUE_TBL_COLNAME_LENGTH_MAX 64
#define RESOURCE_QUEUE_DDL_POLICY_LENGTH_MAX 64
#define RESOURCE_ROLE_DDL_ATTR_LENGTH_MAX 64
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index bc3ec27..de8131f 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -72,7 +72,7 @@ bool handleRMRequestConnectionReg(void **arg)
else
{
/* No connection id resource. Return occupation in resource queue. */
- returnConnectionToQueue(conntrack, true);
+ returnConnectionToQueue(conntrack, false);
elog(LOG, "Resource manager can not accept more connections.");
response.Result = res;
response.ConnID = INVALID_CONNID;
@@ -150,7 +150,7 @@ bool handleRMRequestConnectionRegByOID(void **arg)
}
else {
/* No connection id resource. Return occupation in resource queue. */
- returnConnectionToQueue(conntrack, true);
+ returnConnectionToQueue(conntrack, false);
elog(LOG, "Resource manager can not accept more connections.");
response.Result = res;
response.ConnID = INVALID_CONNID;
@@ -194,9 +194,11 @@ bool handleRMRequestConnectionUnReg(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d try to unregister.",
request->ConnID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
@@ -213,15 +215,16 @@ bool handleRMRequestConnectionUnReg(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d unregisters connection.",
request->ConnID);
- if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) ) {
- elog(RMLOG, "HAWQ RM :: Wrong connection status for unregistering. "
+ if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
+ {
+ elog(DEBUG5, "HAWQ RM :: Wrong connection status for unregistering. "
"Current connection status is %d.",
(*conntrack)->Progress);
res = REQUESTHANDLER_WRONG_CONNSTAT;
goto sendresponse;
}
- returnConnectionToQueue(*conntrack, true);
+ returnConnectionToQueue(*conntrack, false);
elog(DEBUG3, "One connection is unregistered. ConnID=%d", (*conntrack)->ConnID);
@@ -299,7 +302,8 @@ bool handleRMRequestAcquireResource(void **arg)
* request list.
*/
if ( PQUEMGR->RootTrack != NULL &&
- PQUEMGR->RootTrack->ClusterSegNumberMax == 0 ) {
+ PQUEMGR->RootTrack->ClusterSegNumberMax == 0 )
+ {
return false;
}
@@ -314,9 +318,11 @@ bool handleRMRequestAcquireResource(void **arg)
request->ConnID,
request->SessionID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
elog(LOG, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
@@ -326,21 +332,23 @@ bool handleRMRequestAcquireResource(void **arg)
(*conntrack)->Progress);
}
- request = (RPCRequestHeadAcquireResourceFromRM)
- ((*conntrack)->MessageBuff.Buffer);
- if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE ) {
+ request = (RPCRequestHeadAcquireResourceFromRM)((*conntrack)->MessageBuff.Buffer);
+ if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: The connection track already has allocated "
"resource. Send again. ConnID=%d",
request->ConnID);
goto sendagain;
}
- else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT ) {
+ else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
+ {
elog(DEBUG5, "HAWQ RM :: The connection track already accepted "
"acquire resource request. Ignore. ConnID=%d",
request->ConnID);
goto sendignore;
}
- else if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE ) {
+ else if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
"Current connection status is %d.",
(*conntrack)->Progress);
@@ -397,7 +405,8 @@ sendresponse:
(*conntrack)->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE);
- if ( res == CONNTRACK_NO_CONNID ) {
+ if ( res == CONNTRACK_NO_CONNID )
+ {
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
}
@@ -440,10 +449,12 @@ bool handleRMRequestReturnResource(void **arg)
elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
request->ConnID);
- if ( (*conntrack)->ConnID == INVALID_CONNID ) {
+ if ( (*conntrack)->ConnID == INVALID_CONNID )
+ {
res = retrieveConnectionTrack((*conntrack), request->ConnID);
- if ( res != FUNC_RETURN_OK ) {
- elog(LOG, "Not valid resource context with id %d.", request->ConnID);
+ if ( res != FUNC_RETURN_OK )
+ {
+ elog(WARNING, "Not valid resource context with id %d.", request->ConnID);
goto sendresponse;
}
elog(DEBUG5, "HAWQ RM :: Fetched existing connection track "
@@ -456,14 +467,16 @@ bool handleRMRequestReturnResource(void **arg)
request = (RPCRequestHeadReturnResource)
((*conntrack)->MessageBuff.Buffer);
elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
- request->ConnID);
+ request->ConnID);
- if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE ) {
+ if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: The resource has been returned or has not been "
"acquired.");
goto sendresponse;
}
- else if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE ) {
+ else if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+ {
elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
"Current connection status is %d.",
(*conntrack)->Progress);
@@ -478,6 +491,8 @@ bool handleRMRequestReturnResource(void **arg)
sendresponse:
{
+ elog(DEBUG3, "Return resource result %d.", res);
+
RPCResponseHeadReturnResourceData response;
response.Result = res;
response.Reserved = 0;
@@ -488,7 +503,8 @@ sendresponse:
(*conntrack)->MessageMark2,
RESPONSE_QD_RETURN_RESOURCE );
- if ( res == CONNTRACK_NO_CONNID ) {
+ if ( res == CONNTRACK_NO_CONNID )
+ {
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
}
@@ -797,14 +813,20 @@ bool handleRMRequestRefreshResource(void **arg)
uint32_t *connids = (uint32_t *)
(conntrack->MessageBuff.Buffer +
sizeof(RPCRequestHeadRefreshResourceHeartBeatData));
- for ( int i = 0 ; i < request->ConnIDCount ; ++i ) {
+
+ elog(DEBUG3, "Resource manager refreshes %d ConnIDs.", request->ConnIDCount);
+
+ for ( int i = 0 ; i < request->ConnIDCount ; ++i )
+ {
/* Find connection track identified by ConnID */
res = getInUseConnectionTrack(connids[i], &oldct);
- if ( res == FUNC_RETURN_OK ) {
+ if ( res == FUNC_RETURN_OK )
+ {
oldct->LastActTime = curmsec;
- elog(DEBUG5, "Refreshed resource of connection id %d", connids[i]);
+ elog(DEBUG3, "Refreshed resource of connection id %d", connids[i]);
}
- else {
+ else
+ {
elog(DEBUG3, "Can not find connection id %d for resource refreshing.",
connids[i]);
}
[3/4] incubator-hawq git commit: HAWQ-25. Add resource queue new ddl
statement implementation, refine partial GUC variable names,
use libyarn supporting kerberos.
Posted by yj...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/requesthandler_ddl.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_ddl.c b/src/backend/resourcemanager/requesthandler_ddl.c
index df038ea..9005a22 100644
--- a/src/backend/resourcemanager/requesthandler_ddl.c
+++ b/src/backend/resourcemanager/requesthandler_ddl.c
@@ -45,18 +45,21 @@ void freeUpdateActionList(MCTYPE context, List **actions);
* mapping with the definition of table pg_resqueue in pg_resqueue.h
*/
const char* PG_Resqueue_Column_Names[Natts_pg_resqueue] = {
- "rsqname",
- "rsq_parent",
- "rsq_active_stats_cluster",
- "rsq_memory_limit_cluster",
- "rsq_core_limit_cluster",
- "rsq_resource_upper_factor",
- "rsq_allocation_policy",
- "rsq_vseg_resource_quota",
- "rsq_vseg_upper_limit",
- "rsq_creation_time",
- "rsq_update_time",
- "rsq_status"
+ "name",
+ "parentoid",
+ "activestats",
+ "memorylimit",
+ "corelimit",
+ "resovercommit",
+ "allocpolicy",
+ "vsegresourcequota",
+ "nvsegupperlimit",
+ "nvseglowerlimit",
+ "nvsegupperlimitperseg",
+ "nvseglowerlimitperseg",
+ "creationtime",
+ "updatetime",
+ "status"
};
/**
@@ -72,7 +75,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
DynResourceQueueTrack todroptrack = NULL;
DynResourceQueueTrack toupdatetrack = NULL;
SelfMaintainBufferData responsebuff;
- char errorbuf[1024] = "";
+ static char errorbuf[1024] = "";
bool exist = false;
List *fineattr = NULL;
List *rsqattr = NULL;
@@ -80,8 +83,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
DynResourceQueue oldqueue = NULL;
/* Check context and retrieve the connection track based on connection id.*/
- RPCRequestHeadManipulateResQueue request =
- (RPCRequestHeadManipulateResQueue)((*conntrack)->MessageBuff.Buffer);
+ RPCRequestHeadManipulateResQueue request = (RPCRequestHeadManipulateResQueue)
+ ((*conntrack)->MessageBuff.Buffer);
elog(LOG, "Resource manager gets a request from ConnID %d to submit resource "
"queue DDL statement.",
@@ -155,8 +158,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
foreach(cell, rsqattr)
{
KVProperty attribute = lfirst(cell);
- elog(LOG, "Resource manager received DDL Request: %s=%s",
- attribute->Key.Str, attribute->Val.Str);
+ elog(DEBUG3, "Resource manager received DDL Request: %s=%s",
+ attribute->Key.Str, attribute->Val.Str);
}
/* Shallow parse the 'withlist' attributes. */
@@ -167,10 +170,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
if (res != FUNC_RETURN_OK)
{
ddlres = res;
- elog(WARNING, ERRORPOS_FORMAT
- "Can not recognize DDL attribute because %s",
- ERRREPORTPOS,
- errorbuf);
+ elog(WARNING, "Can not recognize DDL attribute because %s", errorbuf);
goto senderr;
}
@@ -202,6 +202,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
newqueue = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
res = parseResourceQueueAttributes(fineattr,
newqueue,
+ false,
errorbuf,
sizeof(errorbuf));
if (res != FUNC_RETURN_OK)
@@ -228,10 +229,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
}
newtrack = NULL;
- res = createQueueAndTrack(newqueue,
- &newtrack,
- errorbuf,
- sizeof(errorbuf));
+ res = createQueueAndTrack(newqueue, &newtrack, errorbuf, sizeof(errorbuf));
if (res != FUNC_RETURN_OK)
{
rm_pfree(PCONTEXT, newqueue);
@@ -267,6 +265,23 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
break;
case MANIPULATE_RESQUEUE_ALTER:
+ newqueue = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
+ res = parseResourceQueueAttributes(fineattr,
+ newqueue,
+ true,
+ errorbuf,
+ sizeof(errorbuf));
+ if (res != FUNC_RETURN_OK)
+ {
+ rm_pfree(PCONTEXT, newqueue);
+ ddlres = res;
+ elog(WARNING, "Resource manager can not alter resource queue "
+ "with its attributes because %s",
+ errorbuf);
+ goto senderr;
+ }
+ rm_pfree(PCONTEXT, newqueue);
+
toupdatetrack = getQueueTrackByQueueName((char *)(nameattr->Val.Str),
nameattr->Val.Len,
&exist);
@@ -428,10 +443,8 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
if (res != FUNC_RETURN_OK)
{
ddlres = res;
- elog(WARNING, ERRORPOS_FORMAT
- "Resource manager can not dropQueueAndTrack because %s",
- ERRREPORTPOS,
- errorbuf);
+ elog(WARNING, "Resource manager can not dropQueueAndTrack because %s",
+ errorbuf);
goto senderr;
}
@@ -471,58 +484,62 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
return true;
senderr:
- initializeSelfMaintainBuffer(&responsebuff, PCONTEXT);
- appendSelfMaintainBuffer(&responsebuff, (void *)&ddlres, sizeof(uint32_t));
- appendSelfMaintainBufferTill64bitAligned(&responsebuff);
-
- if (ddlres != FUNC_RETURN_OK) {
- appendSelfMaintainBuffer(&responsebuff, errorbuf, strlen(errorbuf)+1);
- }
-
- appendSelfMaintainBufferTill64bitAligned(&responsebuff);
-
- /* Build message saved in the connection track instance. */
- buildResponseIntoConnTrack((*conntrack),
- responsebuff.Buffer,
- responsebuff.Cursor+1,
- (*conntrack)->MessageMark1,
- (*conntrack)->MessageMark2,
- RESPONSE_QD_DDL_MANIPULATERESQUEUE);
- (*conntrack)->ResponseSent = false;
{
+ initializeSelfMaintainBuffer(&responsebuff, PCONTEXT);
+
+ RPCResponseHeadManipulateResQueueERRORData response;
+ response.Result.Result = ddlres;
+ response.Result.Reserved = 0;
+
+ appendSMBVar(&responsebuff, response.Result);
+ appendSMBStr(&responsebuff, errorbuf);
+ appendSelfMaintainBufferTill64bitAligned(&responsebuff);
+
+ /* Build message saved in the connection track instance. */
+ buildResponseIntoConnTrack((*conntrack),
+ responsebuff.Buffer,
+ responsebuff.Cursor + 1,
+ (*conntrack)->MessageMark1,
+ (*conntrack)->MessageMark2,
+ RESPONSE_QD_DDL_MANIPULATERESQUEUE);
+ (*conntrack)->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
MEMORY_CONTEXT_SWITCH_BACK
- }
- destroySelfMaintainBuffer(&responsebuff);
+ destroySelfMaintainBuffer(&responsebuff);
- /* Clean up temporary variable. */
- cleanPropertyList(PCONTEXT, &fineattr);
- cleanPropertyList(PCONTEXT, &rsqattr);
- return true;
+ /* Clean up temporary variable. */
+ cleanPropertyList(PCONTEXT, &fineattr);
+ cleanPropertyList(PCONTEXT, &rsqattr);
+ return true;
+ }
}
bool handleRMDDLRequestManipulateRole(void **arg)
{
RPCResponseHeadManipulateRoleData response;
- ConnectionTrack conntrack = (ConnectionTrack)(*arg);
- UserInfo user;
- int res = FUNC_RETURN_OK;
+ ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+ UserInfo user = NULL;
+ int res = FUNC_RETURN_OK;
- RPCRequestHeadManipulateRole request =
- (RPCRequestHeadManipulateRole )conntrack->MessageBuff.Buffer;
+ RPCRequestHeadManipulateRole request = (RPCRequestHeadManipulateRole)
+ (conntrack->MessageBuff.Buffer);
switch(request->Action)
{
case MANIPULATE_ROLE_RESQUEUE_CREATE:
{
+ /*
+ * In case creating new role, resource manager expects no error, as
+ * in QD side, the validation was passed.
+ */
user = rm_palloc0(PCONTEXT, sizeof(UserInfoData));
user->OID = request->RoleOID;
user->QueueOID = request->QueueOID;
user->isSuperUser = request->isSuperUser;
strncpy(user->Name, request->Name, sizeof(user->Name)-1);
- res = createUser(user, NULL, 0);
+ createUser(user);
elog(LOG, "Resource manager handles request CREATE ROLE oid:%d, "
"queueID:%d, isSuper:%d, roleName:%s",
request->RoleOID,
@@ -533,14 +550,13 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
case MANIPULATE_ROLE_RESQUEUE_ALTER:
{
- res = dropUser((int64_t)request->RoleOID, request->Name);
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Resource manager cannot find user "INT64_FORMAT
- " to alter.",
- (int64_t)(request->RoleOID));
- goto exit;
- }
+ /*
+ * In case altering one role, the old one is deleted firstly.
+ * Resource manager expects the role always exists.
+ */
+ int64_t roleoid = request->RoleOID;
+ res = dropUser(roleoid, request->Name);
+ Assert(res == FUNC_RETURN_OK);
/* Create new user instance. */
user = (UserInfo)rm_palloc0(PCONTEXT, sizeof(UserInfoData));
@@ -548,7 +564,7 @@ bool handleRMDDLRequestManipulateRole(void **arg)
user->QueueOID = request->QueueOID;
user->isSuperUser = request->isSuperUser;
strncpy(user->Name, request->Name, sizeof(user->Name)-1);
- res = createUser(user, NULL, 0);
+ createUser(user);
elog(LOG, "Resource manager handles request ALTER ROLE oid:%d, "
"queueID:%d, isSuper:%d, roleName:%s",
request->RoleOID,
@@ -559,14 +575,10 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
case MANIPULATE_ROLE_RESQUEUE_DROP:
{
- res = dropUser((int64_t)request->RoleOID, request->Name);
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Resource manager cannot find user "INT64_FORMAT
- " to drop.",
- (int64_t)(request->RoleOID));
- goto exit;
- }
+ /* Resource manager expects the role always exists. */
+ int64_t roleoid = request->RoleOID;
+ res = dropUser(roleoid, request->Name);
+ Assert(res == FUNC_RETURN_OK);
elog(LOG, "Resource manager handles request drop role oid:%d, "
"roleName:%s",
request->RoleOID,
@@ -579,7 +591,6 @@ bool handleRMDDLRequestManipulateRole(void **arg)
}
}
-exit:
/* Build response. */
response.Result = res;
response.Reserved = 0;
@@ -821,18 +832,21 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
List *rsqattr,
List **insvalues)
{
+ static char defaultActiveStats[] = DEFAULT_RESQUEUE_ACTIVESTATS;
+ static char defaultResOvercommit[] = DEFAULT_RESQUEUE_OVERCOMMIT;
+ static char defaultNVSegUpperLimit[] = DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT;
+ static char defaultNVSegLowerLimit[] = DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT;
+ static char defaultNVSegUpperLimitPerSeg[] = DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT;
+ static char defaultNVSegLowerLimitPerSeg[] = DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT;
+ static char defaultAllocPolicy[] = DEFAULT_RESQUEUE_ALLOCPOLICY;
+ static char defaultVSegResourceQuota[] = DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA;
+ int res = FUNC_RETURN_OK;
+ PAIR newpair = NULL;
+
Assert( queue != NULL );
Assert( rsqattr != NULL );
Assert( insvalues != NULL );
- int res = FUNC_RETURN_OK;
- char defaultActiveStats[] = DEFAULT_RESQUEUE_ACTIVESTATS;
- char defaultUpperFactor[] = DEFAULT_RESQUEUE_UPPERFACTOR;
- char defaultVSegUpperFactor[] = DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT;
- char defaultPolicy[] = DEFAULT_RESQUEUE_POLICY;
- char defaultSegQuota[] = DEFAULT_RESQUEUE_SEG_QUOTA;
- PAIR newpair = NULL;
-
/* Insert resource queue column value. */
newpair = createPAIR(PCONTEXT,
TYPCONVERT(void *, Anum_pg_resqueue_rsqname),
@@ -846,67 +860,101 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
*insvalues = lappend(*insvalues, newpair);
- /* Default value for rsq_active_stats_cluster if not set */
SimpStringPtr colvalue = NULL;
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_ACTIVE_STATMENTS),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_ACTIVE_STATMENTS),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultActiveStats,
+ Anum_pg_resqueue_activestats);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultResOvercommit,
+ Anum_pg_resqueue_resovercommit);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT),
+ &colvalue) != FUNC_RETURN_OK)
+ {
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegUpperLimit,
+ Anum_pg_resqueue_nvsegupperlimit);
+ }
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultActiveStats, Anum_pg_resqueue_rsq_active_stats_cluster);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegLowerLimit,
+ Anum_pg_resqueue_nvseglowerlimit);
}
- /* Default value for rsq_resource_upper_factor if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR),
- &colvalue) != FUNC_RETURN_OK)
+
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultUpperFactor, Anum_pg_resqueue_rsq_resource_upper_factor);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegUpperLimitPerSeg,
+ Anum_pg_resqueue_nvsegupperlimitperseg);
}
/* Default value for rsq_vseg_upper_limit if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultVSegUpperFactor, Anum_pg_resqueue_rsq_vseg_upper_limit);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultNVSegLowerLimitPerSeg,
+ Anum_pg_resqueue_nvseglowerlimitperseg);
}
/* Default value for rsq_allocation_policy if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_ALLOCATION_POLICY),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_ALLOCATION_POLICY),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultPolicy, Anum_pg_resqueue_rsq_allocation_policy);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultAllocPolicy,
+ Anum_pg_resqueue_allocpolicy);
}
/* Default value for rsq_vseg_resource_quota if not set */
- if (findPropertyValue(
- rsqattr,
- getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA),
- &colvalue) != FUNC_RETURN_OK)
+ if (findPropertyValue(rsqattr,
+ getRSQDDLAttributeName(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA),
+ &colvalue) != FUNC_RETURN_OK)
{
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, defaultSegQuota, Anum_pg_resqueue_rsq_vseg_resource_quota);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues,
+ defaultVSegResourceQuota,
+ Anum_pg_resqueue_vsegresourcequota);
}
- ADD_PG_RESQUEUE_COLVALUE_OID(insvalues, queue->ParentOID, Anum_pg_resqueue_rsq_parent);
+ ADD_PG_RESQUEUE_COLVALUE_OID(insvalues, queue->ParentOID, Anum_pg_resqueue_parentoid);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_rsq_active_stats_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_memory_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_core_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, Anum_pg_resqueue_rsq_resource_upper_factor);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, Anum_pg_resqueue_rsq_vseg_upper_limit);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_rsq_allocation_policy);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, Anum_pg_resqueue_rsq_vseg_resource_quota);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_activestats);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_memorylimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_corelimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, Anum_pg_resqueue_resovercommit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, Anum_pg_resqueue_nvsegupperlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, Anum_pg_resqueue_nvseglowerlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, Anum_pg_resqueue_nvsegupperlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, Anum_pg_resqueue_nvseglowerlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_allocpolicy);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(insvalues, rsqattr, RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, Anum_pg_resqueue_vsegresourcequota);
/* creation time and update time */
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_rsq_creation_time);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_creationtime);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, curtimestr, Anum_pg_resqueue_updatetime);
/* status */
char statusstr[256];
@@ -914,7 +962,7 @@ int buildInsertActionForPGResqueue(DynResourceQueue queue,
if ( RESQUEUE_IS_BRANCH(queue) )
strcat(statusstr, "branch");
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, statusstr, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(insvalues, statusstr, Anum_pg_resqueue_status);
MEMORY_CONTEXT_SWITCH_BACK
return res;
@@ -926,18 +974,21 @@ int buildUpdateActionForPGResqueue(DynResourceQueue queue,
{
int res = FUNC_RETURN_OK;
/* Insert resource queue column value. */
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_rsq_active_stats_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_memory_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_rsq_core_limit_cluster);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, Anum_pg_resqueue_rsq_resource_upper_factor);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_rsq_allocation_policy);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, Anum_pg_resqueue_rsq_vseg_resource_quota);
- ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, Anum_pg_resqueue_rsq_vseg_upper_limit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ACTIVE_STATMENTS, Anum_pg_resqueue_activestats);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, Anum_pg_resqueue_memorylimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, Anum_pg_resqueue_corelimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, Anum_pg_resqueue_resovercommit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_ALLOCATION_POLICY, Anum_pg_resqueue_allocpolicy);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, Anum_pg_resqueue_vsegresourcequota);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, Anum_pg_resqueue_nvsegupperlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, Anum_pg_resqueue_nvseglowerlimit);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, Anum_pg_resqueue_nvsegupperlimitperseg);
+ ADD_PG_RESQUEUE_COLVALUE_INDDLATTR(updvalues, rsqattr, RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, Anum_pg_resqueue_nvseglowerlimitperseg);
/* creation time and update time */
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, curtimestr, Anum_pg_resqueue_updatetime);
/* status */
char statusstr[256];
@@ -945,7 +996,7 @@ int buildUpdateActionForPGResqueue(DynResourceQueue queue,
if ( RESQUEUE_IS_BRANCH(queue) )
strcat(statusstr, "branch");
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, statusstr, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues, statusstr, Anum_pg_resqueue_status);
return res;
}
@@ -969,10 +1020,10 @@ int buildUpdateStatusActionForPGResqueue(DynResourceQueue queue,
TimestampTz curtime = GetCurrentTimestamp();
const char *curtimestr = timestamptz_to_str(curtime);
- ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues,curtimestr, Anum_pg_resqueue_rsq_update_time);
+ ADD_PG_RESQUEUE_COLVALUE_CONSTSTR(updvalues,curtimestr, Anum_pg_resqueue_updatetime);
/* status */
- ADD_PG_RESQUEUE_COLVALUE_INATTR(updvalues, rsqattr, RSQ_TBL_ATTR_STATUS, Anum_pg_resqueue_rsq_status);
+ ADD_PG_RESQUEUE_COLVALUE_INATTR(updvalues, rsqattr, RSQ_TBL_ATTR_STATUS, Anum_pg_resqueue_status);
return res;
}
@@ -1352,19 +1403,6 @@ int performDeleteActionForPGResqueue(char *queuename)
sql->data,
PQresultErrorMessage(result));
- /* MPP-6923: drop the extended attributes for this queue */
- PQclear(result);
- resetPQExpBuffer(sql);
- appendPQExpBuffer(sql,
- "DELETE FROM pg_resqueuecapability WHERE resqueueid = %d",
- queueid);
- result = PQexec(conn, sql->data);
- if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
- elog(WARNING, "Resource manager failed to run SQL: %s "
- "when delete a row from pg_resqueue, reason : %s",
- sql->data,
- PQresultErrorMessage(result));
-
PQclear(result);
result = PQexec(conn, "COMMIT");
if (!result || PQresultStatus(result) != PGRES_COMMAND_OK) {
@@ -1405,25 +1443,25 @@ Datum getDatumFromStringValuForPGResqueue(int colindex,
return DirectFunctionCall1(namein, CStringGetDatum(colvaluestr));
- case Anum_pg_resqueue_rsq_creation_time:
- case Anum_pg_resqueue_rsq_update_time:
+ case Anum_pg_resqueue_creationtime:
+ case Anum_pg_resqueue_updatetime:
return 0;
- case Anum_pg_resqueue_rsq_memory_limit_cluster:
- case Anum_pg_resqueue_rsq_core_limit_cluster:
- case Anum_pg_resqueue_rsq_allocation_policy:
- case Anum_pg_resqueue_rsq_vseg_resource_quota:
- case Anum_pg_resqueue_rsq_status:
+ case Anum_pg_resqueue_memorylimit:
+ case Anum_pg_resqueue_corelimit:
+ case Anum_pg_resqueue_allocpolicy:
+ case Anum_pg_resqueue_vsegresourcequota:
+ case Anum_pg_resqueue_status:
/* Set value as text format */
return DirectFunctionCall1(textin, CStringGetDatum(colvaluestr));
- case Anum_pg_resqueue_rsq_active_stats_cluster:
+ case Anum_pg_resqueue_activestats:
{
int32_t tmpvalue;
sscanf(colvaluestr, "%d", &tmpvalue);
return Int32GetDatum(tmpvalue);
}
- case Anum_pg_resqueue_rsq_parent:
+ case Anum_pg_resqueue_parentoid:
{
int64_t tmpoid;
Oid parentoid;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
index 0fe4412..ecb9e92 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c
@@ -11,11 +11,16 @@
#include "libyarn/LibYarnClientC.h"
+#include <krb5.h>
+#include "cdb/cdbfilesystemcredential.h"
+
/*
*------------------------------------------------------------------------------
* Internal functions
*------------------------------------------------------------------------------
*/
+char * ExtractPrincipalFromTicketCache(const char* cache);
+
int ResBrokerMainInternal(void);
int loadParameters(void);
@@ -66,6 +71,12 @@ void quitResBroker(SIGNAL_ARGS);
uint64_t ResBrokerStartTime;
+/* The user who submits hawq application to Hadoop Yarn,
+ * default is postgres, if Kerberos is enable, should be principal name.
+ * */
+char* YARNUser;
+bool YARNUserShouldFree;
+
SimpString YARNServer;
SimpString YARNPort;
SimpString YARNSchedulerServer;
@@ -286,6 +297,85 @@ int ResBrokerMainInternal(void)
return FUNC_RETURN_OK;
}
+/*
+ * Extract principal from cache
+ */
+char * ExtractPrincipalFromTicketCache(const char* cache)
+{
+ krb5_context cxt = NULL;
+ krb5_ccache ccache = NULL;
+ krb5_principal principal = NULL;
+ krb5_error_code ec = 0;
+ char *priName = NULL, *retval = NULL;
+ const char *errorMsg = NULL;
+
+ /*
+ * refresh kerberos ticket
+ */
+ if (!login()) {
+ elog(WARNING, "Cannot login kerberos.");
+ return NULL;
+ }
+
+ if (!cache) {
+ if (0 != setenv("KRB5CCNAME", cache, 1)) {
+ elog(WARNING, "Cannot set env parameter \"KRB5CCNAME\" when extract principal from cache:%s", cache);
+ return NULL;
+ }
+ }
+
+ do {
+ if (0 != (ec = krb5_init_context(&cxt))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_cc_default(cxt, &ccache))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) {
+ break;
+ }
+
+ if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) {
+ break;
+ }
+ } while (0);
+
+ if (!ec) {
+ retval = strdup(priName);
+ } else {
+ if (cxt) {
+ errorMsg = krb5_get_error_message(cxt, ec);
+ } else {
+ errorMsg = "Cannot initialize kerberos context";
+ }
+ }
+
+ if (priName != NULL) {
+ krb5_free_unparsed_name(cxt, priName);
+ }
+
+ if (principal != NULL) {
+ krb5_free_principal(cxt, principal);
+ }
+
+ if (ccache != NULL) {
+ krb5_cc_close(cxt, ccache);
+ }
+
+ if (cxt != NULL) {
+ krb5_free_context(cxt);
+ }
+
+ if (errorMsg != NULL) {
+ elog(WARNING, "Fail to extract principal from cache, because : %s", errorMsg);
+ return NULL;
+ }
+
+ return retval;
+}
+
int loadParameters(void)
{
int res = FUNC_RETURN_OK;
@@ -296,6 +386,8 @@ int loadParameters(void)
initSimpleString(&YARNSchedulerPort, PCONTEXT);
initSimpleString(&YARNQueueName, PCONTEXT);
initSimpleString(&YARNAppName, PCONTEXT);
+ YARNUser = NULL;
+ YARNUserShouldFree = false;
/* Get server and port */
char *pcolon = NULL;
@@ -363,16 +455,31 @@ int loadParameters(void)
setSimpleStringNoLen(&YARNAppName, rm_grm_yarn_app_name);
+ /* If kerberos is enable, fetch the principal from ticket cache file. */
+ if (enable_secure_filesystem)
+ {
+ YARNUser = ExtractPrincipalFromTicketCache(krb5_ccname);
+ YARNUserShouldFree = true;
+ }
+
+ if (YARNUser == NULL)
+ {
+ YARNUser = "postgres";
+ YARNUserShouldFree = false;
+ }
+
elog(LOG, "YARN mode resource broker accepted YARN connection arguments : "
"YARN Server %s:%s "
"Scheduler server %s:%s "
- "Queue %s Application name %s",
+ "Queue %s Application name %s, "
+ "by user:%s",
YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
YARNQueueName.Str,
- YARNAppName.Str);
+ YARNAppName.Str,
+ YARNUser);
exit:
if ( res != FUNC_RETURN_OK ) {
elog(LOG, "YARN mode resource broker failed to load YARN connection arguments.");
@@ -1139,7 +1246,8 @@ int RB2YARN_connectToYARN(void)
int yarnres = FUNCTION_SUCCEEDED;
/* Setup YARN client. */
- yarnres = newLibYarnClient(YARNServer.Str,
+ yarnres = newLibYarnClient(YARNUser,
+ YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
@@ -1665,7 +1773,13 @@ int RB2YARN_disconnectFromYARN(void)
if ( YARNJobID != NULL ) {
free(YARNJobID);
}
+ if (YARNUser != NULL && YARNUserShouldFree )
+ {
+ free(YARNUser);
+ }
LIBYARNClient = NULL;
YARNJobID = NULL;
+ YARNUser = NULL;
+ YARNUserShouldFree = true;
return FUNCTION_SUCCEEDED;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 5e4fe0f..5577e83 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -145,7 +145,6 @@ bool cleanedAllGRMContainers(void);
colname##IsNull ? -1 : \
DatumGetInt64(colname##Datum)));
-
int loadUserPropertiesFromCatalog(List **users);
int loadQueuePropertiesFromCatalog(List **queues);
@@ -618,7 +617,9 @@ int MainHandlerLoop(void)
/* STEP 10. Dispatch resource to queries and send the messages out.*/
if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 &&
PQUEMGR->toRunQueryDispatch &&
- PQUEMGR->ForcedReturnGRMContainerCount == 0 )
+ PQUEMGR->ForcedReturnGRMContainerCount == 0 &&
+ PRESPOOL->AddPendingContainerCount == 0 &&
+ PRESPOOL->SlavesHostCount > 0 )
{
dispatchResourceToQueries();
}
@@ -646,6 +647,12 @@ int MainHandlerLoop(void)
/* STEP 13. Notify segments to decrease resource. */
notifyToBeKickedGRMContainersToRMSEG();
+
+ /*
+ * STEP 14. Check slaves file if the content is not checked or is
+ * updated.
+ */
+ checkSlavesFile();
}
elog(LOG, "Resource manager main event handler exits.");
@@ -960,43 +967,55 @@ static void InitTemporaryDirs(DQueue tmpdirs_list, char *tmpdirs_string)
*/
int loadDynamicResourceManagerConfigure(void)
{
- elog(DEBUG5, "HAWQ RM :: Unix Domain Socket Port %d", rm_master_addr_domain_port);
- elog(DEBUG5, "HAWQ RM :: Socket Listening Port %d", rm_master_addr_port);
- elog(DEBUG5, "HAWQ RM :: Segment Socket Listening Port %d", rm_seg_addr_port);
+#ifdef ENABLE_DOMAINSERVER
+ elog(DEBUG3, "Resource manager loads Unix Domain Socket Port %d",
+ rm_master_addr_domain_port);
+#endif
+ elog(DEBUG3, "Resource manager loads Socket Listening Port %d",
+ rm_master_port);
+ elog(DEBUG3, "Resource manager loads Segment Socket Listening Port %d",
+ rm_segment_port);
/* Decide global resource manager mode. */
- if ( strcmp(rm_grm_server_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 ) {
+ if ( strcmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 )
+ {
DRMGlobalInstance->ImpType = YARN_LIBYARN;
}
- else if ( strcmp(rm_grm_server_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 ) {
+ else if ( strcmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 )
+ {
DRMGlobalInstance->ImpType = NONE_HAWQ2;
}
- else {
- elog(LOG, "Wrong global resource manager type set in %s.",
- HAWQDRM_CONFFILE_SERVER_TYPE);
+ else
+ {
+ elog(WARNING, "Wrong global resource manager type set in %s.",
+ HAWQDRM_CONFFILE_SERVER_TYPE);
return MAIN_CONF_UNSET_ROLE;
}
- elog(DEBUG5, "HAWQ RM :: Resource broker implement mode : %d", DRMGlobalInstance->ImpType);
+ elog(DEBUG3, "Resource manager loads resource broker implement mode : %d",
+ DRMGlobalInstance->ImpType);
SimpString segmem;
- if ( rm_seg_memory_use[0] == '\0' ) {
- elog(LOG, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
+ if ( rm_seg_memory_use[0] == '\0' )
+ {
+ elog(WARNING, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
}
+
setSimpleStringRefNoLen(&segmem, rm_seg_memory_use);
int res = SimpleStringToStorageSizeMB(&segmem,
&(DRMGlobalInstance->SegmentMemoryMB));
- if ( res != FUNC_RETURN_OK) {
- elog(LOG, "Can not understand the value '%s' of property %s.",
- rm_seg_memory_use,
- HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
+ if ( res != FUNC_RETURN_OK)
+ {
+ elog(WARNING, "Can not understand the value '%s' of property %s.",
+ rm_seg_memory_use,
+ HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
}
DRMGlobalInstance->SegmentCore = rm_seg_core_use;
- elog(DEBUG5, "HAWQ RM :: Accepted NONE mode resource management : "
- "each host has (%d MB,%lf).\n",
+ elog(DEBUG3, "HAWQ RM :: Accepted NONE mode resource management setting, "
+ "each host has (%d MB,%lf) resource capacity.\n",
DRMGlobalInstance->SegmentMemoryMB,
DRMGlobalInstance->SegmentCore);
@@ -1043,7 +1062,6 @@ int loadDynamicResourceManagerConfigure(void)
DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period;
/****** Resource enforcement GUCs ends ******/
-
return FUNC_RETURN_OK;
}
@@ -1249,44 +1267,51 @@ cleanup:
*****************************************************************************/
int loadQueuePropertiesFromCatalog(List **queues)
{
- int libpqres = CONNECTION_OK;
- int ret = FUNC_RETURN_OK;
- PGconn *conn = NULL;
+ int libpqres = CONNECTION_OK;
+ int ret = FUNC_RETURN_OK;
+ PGconn *conn = NULL;
static char conninfo[1024];
- PQExpBuffer sql = NULL;
- PGresult* result = NULL;
- int ntups = 0;
- int i_oid = 0,
- i_name = 0,
- i_parent = 0,
- i_active_stats_cluster = 0,
- i_memory_limit_cluster = 0,
- i_core_limit_cluster = 0,
- i_resource_upper_factor = 0,
- i_allocation_policy = 0,
- i_vseg_resource_quota = 0,
- i_vseg_upper_limit = 0,
- i_creation_time = 0,
- i_update_time = 0,
- i_status = 0;
-
- Oid oid = 0,
- rsq_parent = 0;
-
- char *rsqname = NULL,
+ PQExpBuffer sql = NULL;
+ PGresult* result = NULL;
+ int ntups = 0;
+ int i_oid = 0,
+ i_name = 0,
+ i_parent = 0,
+ i_active_stats_cluster = 0,
+ i_memory_limit_cluster = 0,
+ i_core_limit_cluster = 0,
+ i_resource_overcommit = 0,
+ i_allocation_policy = 0,
+ i_vseg_resource_quota = 0,
+ i_nvseg_upper_limit = 0,
+ i_nvseg_lower_limit = 0,
+ i_nvseg_upper_limit_perseg = 0,
+ i_nvseg_lower_limit_perseg = 0,
+ i_creation_time = 0,
+ i_update_time = 0,
+ i_status = 0;
+
+ Oid oid = 0,
+ parentoid = 0;
+
+ char *name = NULL,
*parent = NULL,
- *rsq_memory_limit_cluster = NULL,
- *rsq_core_limit_cluster = NULL,
- *rsq_allocation_policy = NULL,
- *rsq_vseg_resource_quota = NULL,
- *rsq_status = NULL;
+ *memory_limit_cluster = NULL,
+ *core_limit_cluster = NULL,
+ *allocation_policy = NULL,
+ *vseg_resource_quota = NULL,
+ *status = NULL;
+
+ int active_stats_cluster = 0,
+ nvseg_upper_limit = 0,
+ nvseg_lower_limit = 0;
- int rsq_active_stats_cluster = 0,
- rsq_vseg_upper_limit = 0;
+ float nvseg_upper_limit_perseg = 0.0,
+ nvseg_lower_limit_perseg = 0.0,
+ resource_overcommit = 0.0;
- float rsq_resource_upper_factor = 0.0;
- int64 rsq_creation_time = 0,
- rsq_update_time = 0;
+ int64 creation_time = 0,
+ update_time = 0;
snprintf(conninfo, sizeof(conninfo),
"options='-c gp_session_role=UTILITY' "
@@ -1313,19 +1338,23 @@ int loadQueuePropertiesFromCatalog(List **queues)
"sql statement.");
goto cleanup;
}
+
appendPQExpBuffer(sql,"SELECT oid,"
"rsqname,"
- "rsq_parent,"
- "rsq_active_stats_cluster,"
- "rsq_memory_limit_cluster, "
- "rsq_core_limit_cluster, "
- "rsq_resource_upper_factor,"
- "rsq_allocation_policy, "
- "rsq_vseg_resource_quota, "
- "rsq_vseg_upper_limit, "
- "rsq_creation_time, "
- "rsq_update_time, "
- "rsq_status "
+ "parentoid,"
+ "activestats,"
+ "memorylimit, "
+ "corelimit, "
+ "resovercommit,"
+ "allocpolicy, "
+ "vsegresourcequota, "
+ "nvsegupperlimit, "
+ "nvseglowerlimit, "
+ "nvsegupperlimitperseg, "
+ "nvseglowerlimitperseg, "
+ "creationtime, "
+ "updatetime, "
+ "status "
"FROM pg_resqueue");
result = PQexec(conn, sql->data);
@@ -1342,45 +1371,51 @@ int loadQueuePropertiesFromCatalog(List **queues)
ntups = PQntuples(result);
- i_oid = PQfnumber(result, PG_RESQUEUE_COL_OID);
- i_name = PQfnumber(result, PG_RESQUEUE_COL_RSQNAME);
- i_parent = PQfnumber(result, PG_RESQUEUE_COL_PARENT);
- i_active_stats_cluster = PQfnumber(result, PG_RESQUEUE_COL_ACTIVE_STATS_CLUSTER);
- i_memory_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_MEMORY_LIMIT_CLUSTER);
- i_core_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_CORE_LIMIT_CLUSTER);
- i_resource_upper_factor = PQfnumber(result, PG_RESQUEUE_COL_RESOURCE_UPPER_FACTOR);
- i_allocation_policy = PQfnumber(result, PG_RESQUEUE_COL_ALLOCATION_POLICY);
- i_vseg_resource_quota = PQfnumber(result, PG_RESQUEUE_COL_VSEG_RESOURCE_QUOTA);
- i_vseg_upper_limit = PQfnumber(result, PG_RESQUEUE_COL_VSEG_UPPER_LIMIT);
- i_creation_time = PQfnumber(result, PG_RESQUEUE_COL_CREATION_TIME);
- i_update_time = PQfnumber(result, PG_RESQUEUE_COL_UPDATE_TIME);
- i_status = PQfnumber(result, PG_RESQUEUE_COL_STATUS);
+ i_oid = PQfnumber(result, PG_RESQUEUE_COL_OID);
+ i_name = PQfnumber(result, PG_RESQUEUE_COL_RSQNAME);
+ i_parent = PQfnumber(result, PG_RESQUEUE_COL_PARENTOID);
+ i_active_stats_cluster = PQfnumber(result, PG_RESQUEUE_COL_ACTIVESTATS);
+ i_memory_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_MEMORYLIMIT);
+ i_core_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_CORELIMIT);
+ i_resource_overcommit = PQfnumber(result, PG_RESQUEUE_COL_RESOVERCOMMIT);
+ i_allocation_policy = PQfnumber(result, PG_RESQUEUE_COL_ALLOCPOLICY);
+ i_vseg_resource_quota = PQfnumber(result, PG_RESQUEUE_COL_VSEGRESOURCEQUOTA);
+ i_nvseg_upper_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMIT);
+ i_nvseg_lower_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMIT);
+ i_nvseg_upper_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMITPERSEG);
+ i_nvseg_lower_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMITPERSEG);
+ i_creation_time = PQfnumber(result, PG_RESQUEUE_COL_CREATIONTIME);
+ i_update_time = PQfnumber(result, PG_RESQUEUE_COL_UPDATETIME);
+ i_status = PQfnumber(result, PG_RESQUEUE_COL_STATUS);
for (int i = 0; i < ntups; i++)
{
oid = (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10);
- rsqname = PQgetvalue(result, i, i_name);
+ name = PQgetvalue(result, i, i_name);
parent = PQgetvalue(result, i, i_parent);
if (parent == NULL || strlen(parent) == 0)
{
- rsq_parent = InvalidOid;
+ parentoid = InvalidOid;
}
else
{
- rsq_parent = (Oid)strtoul(parent, NULL, 10);
+ parentoid = (Oid)strtoul(parent, NULL, 10);
}
- rsq_parent = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10);
- rsq_active_stats_cluster = atoi(PQgetvalue(result, i, i_active_stats_cluster));
- rsq_memory_limit_cluster = PQgetvalue(result, i, i_memory_limit_cluster);
- rsq_core_limit_cluster = PQgetvalue(result, i, i_core_limit_cluster);
- rsq_resource_upper_factor = atof(PQgetvalue(result, i, i_resource_upper_factor));
- rsq_allocation_policy = PQgetvalue(result, i, i_allocation_policy);
- rsq_vseg_resource_quota = PQgetvalue(result, i, i_vseg_resource_quota);
- rsq_vseg_upper_limit = atoi(PQgetvalue(result, i, i_vseg_upper_limit));
- rsq_creation_time = atol(PQgetvalue(result, i, i_creation_time));
- rsq_update_time = atol(PQgetvalue(result, i, i_update_time));
- rsq_status = PQgetvalue(result, i, i_status);
+ parentoid = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10);
+ active_stats_cluster = atoi(PQgetvalue(result, i, i_active_stats_cluster));
+ memory_limit_cluster = PQgetvalue(result, i, i_memory_limit_cluster);
+ core_limit_cluster = PQgetvalue(result, i, i_core_limit_cluster);
+ resource_overcommit = atof(PQgetvalue(result, i, i_resource_overcommit));
+ allocation_policy = PQgetvalue(result, i, i_allocation_policy);
+ vseg_resource_quota = PQgetvalue(result, i, i_vseg_resource_quota);
+ nvseg_upper_limit = atoi(PQgetvalue(result, i, i_nvseg_upper_limit));
+ nvseg_lower_limit = atoi(PQgetvalue(result, i, i_nvseg_lower_limit));
+ nvseg_upper_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_upper_limit_perseg));
+ nvseg_lower_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_lower_limit_perseg));
+ creation_time = atol(PQgetvalue(result, i, i_creation_time));
+ update_time = atol(PQgetvalue(result, i, i_update_time));
+ status = PQgetvalue(result, i, i_status);
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
*queues = lappend(*queues,
@@ -1397,7 +1432,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME),
&i,
- (Name)rsqname));
+ (Name)name));
*queues = lappend(*queues,
createPropertyOID(
@@ -1405,7 +1440,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_PARENT),
&i,
- rsq_parent));
+ parentoid));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1413,7 +1448,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_ACTIVE_STATMENTS),
&i,
- rsq_active_stats_cluster));
+ active_stats_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1421,7 +1456,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER),
&i,
- rsq_memory_limit_cluster));
+ memory_limit_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1429,7 +1464,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER),
&i,
- rsq_core_limit_cluster));
+ core_limit_cluster));
*queues = lappend(*queues,
createPropertyString(
@@ -1437,31 +1472,55 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_ALLOCATION_POLICY),
&i,
- rsq_allocation_policy));
+ allocation_policy));
*queues = lappend(*queues,
createPropertyFloat(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR),
+ getRSQTBLAttributeName(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR),
&i,
- rsq_resource_upper_factor));
+ resource_overcommit));
*queues = lappend(*queues,
createPropertyString(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEGMENT_RESOURCE_QUOTA),
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA),
&i,
- rsq_vseg_resource_quota));
+ vseg_resource_quota));
*queues = lappend(*queues,
createPropertyInt32(
PCONTEXT,
"queue",
- getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEGMENT_UPPER_LIMIT),
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT),
+ &i,
+ nvseg_upper_limit));
+
+ *queues = lappend(*queues,
+ createPropertyInt32(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT),
+ &i,
+ nvseg_lower_limit));
+
+ *queues = lappend(*queues,
+ createPropertyFloat(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG),
+ &i,
+ nvseg_upper_limit_perseg));
+
+ *queues = lappend(*queues,
+ createPropertyFloat(
+ PCONTEXT,
+ "queue",
+ getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG),
&i,
- rsq_vseg_upper_limit));
+ nvseg_lower_limit_perseg));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1469,7 +1528,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_CREATION_TIME),
&i,
- rsq_creation_time));
+ creation_time));
*queues = lappend(*queues,
createPropertyInt32(
@@ -1477,7 +1536,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_UPDATE_TIME),
&i,
- rsq_update_time));
+ update_time));
*queues = lappend(*queues,
createPropertyString(
@@ -1485,7 +1544,7 @@ int loadQueuePropertiesFromCatalog(List **queues)
"queue",
getRSQTBLAttributeName(RSQ_TBL_ATTR_STATUS),
&i,
- rsq_status));
+ status));
MEMORY_CONTEXT_SWITCH_BACK
}
@@ -1558,12 +1617,12 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
{
KVProperty value = lfirst(cell);
- elog(DEBUG3, "Loads queue property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Loads queue property %s=%s", value->Key.Str, value->Val.Str);
/* Split key string into (attribute, index) */
if ( SimpleStringStartWith(&(value->Key), "queue.") != FUNC_RETURN_OK )
{
- elog(DEBUG3, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
continue;
}
@@ -1619,9 +1678,9 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
currentindex = queueindex;
}
- elog(DEBUG3, "Resource manager loaded attribute for creating queue %s=%s",
- newprop->Key.Str,
- newprop->Val.Str);
+ elog(RMLOG, "Resource manager loaded attribute for creating queue %s=%s",
+ newprop->Key.Str,
+ newprop->Val.Str);
currentattrs = lappend(currentattrs, newprop);
MEMORY_CONTEXT_SWITCH_BACK
@@ -1641,12 +1700,12 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
{
KVProperty value = lfirst(cell);
- elog(DEBUG3, "Loads user property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Loads user property %s=%s", value->Key.Str, value->Val.Str);
/* Split key string into (attribute, index) */
if ( SimpleStringStartWith(&(value->Key), "user.") != FUNC_RETURN_OK )
{
- elog(DEBUG3, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
+ elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
continue;
}
@@ -1698,9 +1757,9 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
currentindex = userindex;
}
- elog(DEBUG3, "Resource manager loaded attribute for creating role %s=%s",
- newprop->Key.Str,
- newprop->Val.Str);
+ elog(RMLOG, "Resource manager loaded attribute for creating role %s=%s",
+ newprop->Key.Str,
+ newprop->Val.Str);
currentattrs = lappend(currentattrs, newprop);
MEMORY_CONTEXT_SWITCH_BACK
@@ -1726,7 +1785,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
foreach(cell2, attrs)
{
KVProperty attrkv = lfirst(cell2);
- elog(DEBUG3, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str);
+ elog(RMLOG, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str);
}
DynResourceQueue newqueue = rm_palloc0(PCONTEXT,
@@ -1734,6 +1793,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
res = parseResourceQueueAttributes(attrs,
newqueue,
+ false,
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
@@ -1748,7 +1808,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
rawrsqs = lappend(rawrsqs, newqueue);
MEMORY_CONTEXT_SWITCH_BACK
- DQUEUE_LOOP_END
+ }
/*
* STEP 2.2. Reorder the resource queue sequence to ensure that every time
@@ -1796,8 +1856,8 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
orderchanged = false;
if ( toreordrsq != NULL )
{
- elog(DEBUG3, "Find one resource queue valid to continue loading %s.",
- toreordrsq->Name);
+ elog(RMLOG, "Find one resource queue valid to continue loading %s.",
+ toreordrsq->Name);
orderchanged = true;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
orderedrsqs = lappend(orderedrsqs, toreordrsq);
@@ -1842,23 +1902,29 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
orderedrsqs = list_delete_first(orderedrsqs);
MEMORY_CONTEXT_SWITCH_BACK
+ elog(RMLOG, "Load queue %s.", partqueue->Name);
+
res = checkAndCompleteNewResourceQueueAttributes(partqueue,
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
+ elog(RMLOG, "res=%d error=%s, after check and complete queue %s.",
+ res,
+ errorbuf,
+ partqueue->Name);
+
rm_pfree(PCONTEXT, partqueue);
- elog( WARNING, "Resource manager can not complete resource queue's "
- "attributes because %s",
- errorbuf);
+ elog(WARNING, "Resource manager can not complete resource queue's "
+ "attributes because %s",
+ errorbuf);
continue;
}
+ elog(RMLOG, "Checked and completed queue %s.", partqueue->Name);
+
DynResourceQueueTrack newtrack = NULL;
- res = createQueueAndTrack(partqueue,
- &newtrack,
- errorbuf,
- sizeof(errorbuf));
+ res = createQueueAndTrack(partqueue, &newtrack, errorbuf, sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
@@ -1875,6 +1941,8 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
continue;
}
+ elog(RMLOG, "Created queue %s.", partqueue->Name);
+
char buffer[1024];
generateQueueReport(partqueue->OID, buffer, sizeof(buffer));
elog(LOG, "Resource manager created resource queue instance : %s",
@@ -1912,15 +1980,7 @@ int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
continue;
}
- res = createUser(newuser, errorbuf, sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK )
- {
- elog(WARNING, "Can not create user %s because %s",
- newuser->Name,
- errorbuf);
- rm_pfree(PCONTEXT, newuser);
- continue;
- }
+ createUser(newuser);
char buffer[256];
generateUserReport(newuser->Name,
@@ -2125,10 +2185,12 @@ int initializeSocketServer(void)
char *allip = "0.0.0.0";
pgsocket RMListenSocket[HAWQRM_SERVER_PORT_COUNT];
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
RMListenSocket[i] = PGINVALID_SOCKET;
}
+#ifdef ENABLE_DOMAINSERVER
/* Listen local unix domain socket port. */
netres = StreamServerPort(AF_UNIX,
NULL,
@@ -2142,7 +2204,8 @@ int initializeSocketServer(void)
* This condition is for double-checking the server is successfully
* created.
*/
- (netres == STATUS_OK && RMListenSocket[0] == PGINVALID_SOCKET) ) {
+ (netres == STATUS_OK && RMListenSocket[0] == PGINVALID_SOCKET) )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(LOG, "Resource manager cannot create UNIX domain socket server. Port=%d",
rm_master_addr_domain_port);
@@ -2152,15 +2215,23 @@ int initializeSocketServer(void)
/* Listen normal socket addresses. */
netres = StreamServerPort(AF_UNSPEC,
allip,
- rm_master_addr_port,
+ rm_master_port,
NULL,
&(RMListenSocket[1]),
HAWQRM_SERVER_PORT_COUNT-1);
-
- if ( netres != STATUS_OK ) {
+#else
+ netres = StreamServerPort(AF_UNSPEC,
+ allip,
+ rm_master_port,
+ NULL,
+ RMListenSocket,
+ HAWQRM_SERVER_PORT_COUNT);
+#endif
+ if ( netres != STATUS_OK )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(LOG, "Resource manager cannot create socket server. Port=%d",
- rm_master_addr_port);
+ rm_master_port);
return res;
}
@@ -2168,39 +2239,54 @@ int initializeSocketServer(void)
initializeAsyncComm();
int validfdcount = 0;
AsyncCommBuffer newbuffer = NULL;
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
- if (RMListenSocket[i] != PGINVALID_SOCKET) {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
+ if (RMListenSocket[i] != PGINVALID_SOCKET)
+ {
netres = registerFileDesc(RMListenSocket[i],
NULL,
ASYNCCOMM_READ,
&AsyncCommBufferHandlersMsgServer,
NULL,
&newbuffer);
- if ( netres != FUNC_RETURN_OK ) {
+ if ( netres != FUNC_RETURN_OK )
+ {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(WARNING, "Resource manager cannot track socket server.");
break;
}
validfdcount++;
-
InitHandler_Message(newbuffer);
}
}
- if ( res != FUNC_RETURN_OK ) {
- for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
- if ( RMListenSocket[i] != PGINVALID_SOCKET ) close(RMListenSocket[i]);
+ if ( res != FUNC_RETURN_OK )
+ {
+ for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
+ {
+ if ( RMListenSocket[i] != PGINVALID_SOCKET )
+ {
+ close(RMListenSocket[i]);
+ }
}
return res;
}
- elog(DEBUG5, "HAWQ RM :: Start accepting resource request. "
- "Listening unix domain socket port %d. "
- "Listening normal socket port %d. "
- "Total listened %d FDs.",
- rm_master_addr_domain_port,
- rm_master_addr_port,
- validfdcount);
+#ifdef ENABLE_DOMAINSERVER
+ elog(LOG, "Resource manager starts accepting resource request. "
+ "Listening unix domain socket port %d. "
+ "Listening normal socket port %d. "
+ "Total listened %d FDs.",
+ rm_master_addr_domain_port,
+ rm_master_port,
+ validfdcount);
+#else
+ elog(LOG, "Resource manager starts accepting resource request. "
+ "Listening normal socket port %d. "
+ "Total listened %d FDs.",
+ rm_master_port,
+ validfdcount);
+#endif
return res;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcemanager_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c
index b62c14d..df589bb 100644
--- a/src/backend/resourcemanager/resourcemanager_RMSEG.c
+++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c
@@ -73,7 +73,7 @@ int initializeSocketServer_RMSEG(void)
/* Listen normal socket addresses. */
netres = StreamServerPort( AF_UNSPEC,
allip,
- rm_seg_addr_port,
+ rm_segment_port,
NULL,
RMListenSocket,
HAWQRM_SERVER_PORT_COUNT);
@@ -87,7 +87,7 @@ int initializeSocketServer_RMSEG(void)
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog( LOG, "Can not create socket server. HostName=%s, Port=%d",
allip,
- rm_seg_addr_port);
+ rm_segment_port);
return res;
}
@@ -125,7 +125,7 @@ int initializeSocketServer_RMSEG(void)
"Listening normal socket port %s:%d. "
"Total listened %d FDs.",
allip,
- rm_seg_addr_port,
+ rm_segment_port,
validfdcount);
return res;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 7e413df..004cd67 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -47,6 +47,8 @@ void getSegResResourceCountersByMemCoreCounters(SegResource resinfo,
VSegmentCounterInternal createVSegmentCounter(uint32_t hdfsnameindex,
SegResource segres);
+void refreshSlavesFileHostSize(FILE *fp);
+
/* Functions for BBST indices. */
int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2);
int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
@@ -331,6 +333,9 @@ void initializeResourcePoolManager(void)
PRESPOOL->allocateResFuncs[i] = NULL;
}
PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes;
+
+ PRESPOOL->SlavesFileTimestamp = 0;
+ PRESPOOL->SlavesHostCount = 0;
}
#define CONNECT_TIMEOUT 60
@@ -1838,7 +1843,7 @@ int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
* slice limit. Because we will gothrough all segments later
* if not enough segments are found in this loop.
*/
- if ( segresource->SliceWorkload + slicesize > rm_slice_num_per_seg_limit )
+ if ( segresource->SliceWorkload + slicesize > rm_nslice_perseg_limit )
{
elog(DEBUG3, "Segment %s contains %d slices working now, it can "
"not afford %d more slices.",
@@ -1968,8 +1973,7 @@ int allocateResourceFromResourcePoolIOBytes(int32_t nodecount,
else
{
- if ( !fixnodecount &&
- curres->SliceWorkload + slicesize > rm_slice_num_per_seg_limit )
+ if ( curres->SliceWorkload + slicesize > rm_nslice_perseg_limit )
{
elog(LOG, "Segment %s contains %d slices working now, "
"it can not afford %d more slices.",
@@ -3553,6 +3557,124 @@ int getSegmentGRMContainerSize(SegResource segres)
return segres->GRMContainerCount;
}
+void checkSlavesFile(void)
+{
+ static char *filename = NULL;
+
+ if ( filename == NULL )
+ {
+
+ char *gphome = getenv("GPHOME");
+ if ( gphome == NULL )
+ {
+ elog(WARNING, "The environment variable GPHOME is not set. "
+ "Resource manager can not find file slaves.");
+ return;
+ }
+
+ filename = rm_palloc0(PCONTEXT, strlen(gphome) + sizeof("/etc/slaves"));
+
+ sprintf(filename, "%s%s", gphome, "/etc/slaves");
+ }
+
+ elog(DEBUG3, "Resource manager reads slaves file %s.", filename);
+
+ /* Get file stat. */
+ struct stat filestat;
+ FILE *fp = fopen(filename, "r");
+ if ( fp == NULL )
+ {
+ elog(WARNING, "Fail to open slaves file %s. errno %d", filename, errno);
+ return;
+ }
+ int fd = fileno(fp);
+
+ int fres = fstat(fd, &filestat);
+ if ( fres != 0 )
+ {
+ fclose(fp);
+ elog(WARNING, "Fail to get slaves file stat %s. errno %d", filename, errno);
+ return;
+ }
+ int64_t filechangetime = filestat.st_mtime;
+
+ elog(DEBUG3, "Current file change time stamp " INT64_FORMAT, filechangetime);
+
+ if ( filechangetime != PRESPOOL->SlavesFileTimestamp )
+ {
+ refreshSlavesFileHostSize(fp);
+ PRESPOOL->SlavesFileTimestamp = filechangetime;
+ }
+
+ fclose(fp);
+}
+
+void refreshSlavesFileHostSize(FILE *fp)
+{
+ static char zero[1] = "";
+ int newcnt = 0;
+ bool haserror = false;
+ SelfMaintainBufferData smb;
+
+ elog(DEBUG3, "Refresh slaves file host size now.");
+
+ initializeSelfMaintainBuffer(&smb, PCONTEXT);
+ while( true )
+ {
+ char c = fgetc(fp);
+ if ( c == EOF )
+ {
+ if ( feof(fp) == 0 )
+ {
+ elog(WARNING, "Failed to read slaves file, ferror() gets %d",
+ ferror(fp));
+ haserror = true;
+ }
+
+ break;
+ }
+
+ if ( c == '\t' || c == ' ' || c == '\r' )
+ {
+ continue;
+ }
+
+ if ( c == '\n' )
+ {
+ if ( smb.Cursor + 1 > 0 )
+ {
+ appendSelfMaintainBuffer(&smb, zero, 1);
+ elog(DEBUG3, "Loaded slaves host %s", smb.Buffer);
+
+ resetSelfMaintainBuffer(&smb);
+ newcnt++;
+ }
+ }
+ else
+ {
+ /* Add this character into the buffer. */
+ appendSelfMaintainBuffer(&smb, &c, 1);
+ }
+ }
+
+ if ( smb.Cursor + 1 > 0 )
+ {
+ appendSelfMaintainBuffer(&smb, zero, 1);
+ elog(DEBUG3, "Loaded slaves host %s (last one)", smb.Buffer);
+ newcnt++;
+ }
+
+ destroySelfMaintainBuffer(&smb);
+
+ if ( !haserror )
+ {
+ elog(LOG, "Resource manager refreshed slaves host size from %d to %d.",
+ PRESPOOL->SlavesHostCount,
+ newcnt);
+ PRESPOOL->SlavesHostCount = newcnt;
+ }
+
+}
void getSegResResourceCountersByMemCoreCounters(SegResource resinfo,
int32_t *allocmem,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a413a426/src/backend/resourcemanager/resqueuecommand.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuecommand.c b/src/backend/resourcemanager/resqueuecommand.c
index 0f69b81..0247fef 100644
--- a/src/backend/resourcemanager/resqueuecommand.c
+++ b/src/backend/resourcemanager/resqueuecommand.c
@@ -56,7 +56,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
- errmsg("resource queue name \"%s\" is reserved",
+ errmsg("resource queue name %s is reserved",
stmt->queue),
errOmitLocation(true)));
}
@@ -73,7 +73,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("resource queue \"%s\" already exists",
+ errmsg("resource queue %s already exists",
stmt->queue)));
}
@@ -86,7 +86,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -99,7 +100,8 @@ void createResourceQueue(CreateQueueStmt *stmt)
GetUserId(),
errorbuf,
sizeof(errorbuf));
- if ( res != FUNC_RETURN_OK ) {
+ if ( res != FUNC_RETURN_OK )
+ {
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
@@ -123,7 +125,7 @@ void createResourceQueue(CreateQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply CREATE RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
}
@@ -144,9 +146,21 @@ void dropResourceQueue(DropQueueStmt *stmt)
/* Permission check - only superuser can create queues. */
if (!superuser())
+ {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create resource queues")));
+ }
+
+ /* Cannot DROP default and root queue */
+ if ( strcmp(stmt->queue, RESOURCE_QUEUE_DEFAULT_QUEUE_NAME) == 0 ||
+ strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot drop system resource queue %s",
+ stmt->queue)));
+ }
/*
* Check the pg_resqueue relation to be certain the queue already
@@ -156,30 +170,19 @@ void dropResourceQueue(DropQueueStmt *stmt)
pcqCtx = caql_addrel(cqclr(&cqc), pg_resqueue_rel);
- tuple = caql_getfirst(
- pcqCtx,
- cql("SELECT * FROM pg_resqueue"
- " WHERE rsqname = :1 FOR UPDATE",
- CStringGetDatum(stmt->queue)));
+ tuple = caql_getfirst(pcqCtx,
+ cql("SELECT * FROM pg_resqueue WHERE rsqname = :1 FOR UPDATE",
+ CStringGetDatum(stmt->queue)));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("resource queue %s does not exist",
stmt->queue)));
- /*
- * Remember the Oid
- */
+ /* Remember the Oid */
queueid = HeapTupleGetOid(tuple);
- /* MPP-6926: cannot DROP default queue */
- if (queueid == DEFAULTRESQUEUE_OID || queueid == ROOTRESQUEUE_OID)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot drop system resource queue \"%s\"",
- stmt->queue)));
-
/*
* Check to see if any roles are in this queue.
*/
@@ -190,7 +193,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
- errmsg("resource queue \"%s\" is used by at least one role",
+ errmsg("resource queue %s is used by at least one role",
stmt->queue)));
}
@@ -252,7 +255,7 @@ void dropResourceQueue(DropQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply DROP RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
@@ -272,9 +275,20 @@ void alterResourceQueue(AlterQueueStmt *stmt)
/* Permission check - only superuser can create queues. */
if (!superuser())
+ {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create resource queues")));
+ }
+
+ /* Cannot DROP default and root queue */
+ if ( strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter system resource queue %s",
+ stmt->queue)));
+ }
/*
* MPP-7960: We cannot run ALTER RESOURCE QUEUE inside a user transaction
@@ -282,7 +296,8 @@ void alterResourceQueue(AlterQueueStmt *stmt)
* resulting in "leaked", unreachable queues.
*/
- if (Gp_role == GP_ROLE_DISPATCH) {
+ if (Gp_role == GP_ROLE_DISPATCH)
+ {
PreventTransactionChain((void *) stmt, "ALTER RESOURCE QUEUE");
}
@@ -301,7 +316,7 @@ void alterResourceQueue(AlterQueueStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("resource queue \"%s\" does not exist",
+ errmsg("resource queue %s does not exist",
stmt->queue)));
}
@@ -350,20 +365,20 @@ void alterResourceQueue(AlterQueueStmt *stmt)
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("%s", errorbuf)));
+ errmsg("Can not apply ALTER RESOURCE QUEUE because %s", errorbuf)));
}
elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");
}
-#define VALIDATE_DDL_DUPLICATE_ATTRIBUTE(index, defel, targref) \
+#define VALID_DDL_DUP(index, defel, targref) \
if (strcmp((defel)->defname, RSQDDLAttrNames[(index)]) == 0) \
{ \
if ((targref) != NULL) \
{ \
ereport(ERROR, \
(errcode(ERRCODE_SYNTAX_ERROR), \
- errmsg("redundant option %s", \
+ errmsg("redundant attribute %s", \
RSQDDLAttrNames[(index)]))); \
} \
(targref) = (defel); \
@@ -372,13 +387,17 @@ void alterResourceQueue(AlterQueueStmt *stmt)
void validateDDLAttributeOptions(List *options)
{
- DefElem *dactivelimit = NULL;
- DefElem *dmemorylimit = NULL;
- DefElem *dcorelimit = NULL;
- DefElem *dvsegresquota = NULL;
- DefElem *dallocpolicy = NULL;
- DefElem *dresupperfactor = NULL;
- DefElem *dvsegupperlimit = NULL;
+ DefElem *dparent = NULL;
+ DefElem *dactivelimit = NULL;
+ DefElem *dmemorylimit = NULL;
+ DefElem *dcorelimit = NULL;
+ DefElem *dvsegresquota = NULL;
+ DefElem *dallocpolicy = NULL;
+ DefElem *dresovercommit = NULL;
+ DefElem *dnvsegupperlimit = NULL;
+ DefElem *dnvseglowerlimit = NULL;
+ DefElem *dnvsegupperlimitpseg = NULL;
+ DefElem *dnvseglowerlimitpseg = NULL;
Cost activelimit = INVALID_RES_LIMIT_THRESHOLD;
ListCell *option = NULL;
@@ -387,13 +406,17 @@ void validateDDLAttributeOptions(List *options)
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_ACTIVE_STATMENTS, defel, dactivelimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, defel, dmemorylimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, defel, dcorelimit)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_VSEGMENT_RESOURCE_QUOTA, defel, dvsegresquota)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_ALLOCATION_POLICY, defel, dallocpolicy)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR, defel, dresupperfactor)
- VALIDATE_DDL_DUPLICATE_ATTRIBUTE(RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT, defel, dvsegupperlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_PARENT, defel, dparent)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_ACTIVE_STATMENTS, defel, dactivelimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, defel, dmemorylimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, defel, dcorelimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, defel, dvsegresquota)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_ALLOCATION_POLICY, defel, dallocpolicy)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, defel, dresovercommit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, defel, dnvsegupperlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, defel, dnvseglowerlimit)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, defel, dnvsegupperlimitpseg)
+ VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, defel, dnvseglowerlimitpseg)
}
/* Perform range checks on the various thresholds.*/
@@ -440,31 +463,83 @@ void validateDDLAttributeOptions(List *options)
}
}
- /* The vsegment upper limit must be an integer and no less than -1. */
- if (dvsegupperlimit != NULL)
+ /*
+ * NVSEG_UPPER_LIMIT/NVSEG_LOWER_LIMIT has 0 as default value that means the
+ * setting is not effective, otherwise, it must be greater than 0.
+ */
+ int64_t nvsegupperlimit = -1;
+ int64_t nvseglowerlimit = -1;
+ if (dnvsegupperlimit != NULL)
{
- int64_t vsegupperlimit = defGetInt64(dvsegupperlimit);
- if (vsegupperlimit < DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT_N)
+ nvsegupperlimit = defGetInt64(dnvsegupperlimit);
+ if (nvsegupperlimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s cannot be less than %s",
- RSQDDLAttrNames[RSQ_DDL_ATTR_VSEGMENT_UPPER_LIMIT],
- DEFAULT_RESQUEUE_VSEG_UPPER_LIMIT)));
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT)));
}
}
+ if (dnvseglowerlimit != NULL)
+ {
+ nvseglowerlimit = defGetInt64(dnvseglowerlimit);
+ if (nvseglowerlimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT)));
+ }
+ }
+
+ /*
+ * NVSEG_UPPER_LIMIT_PERSEG/NVSEG_LOWER_LIMIT_PERSEG has 0 as default value
+ * that means the setting is not effective, otherwise, it must be greater
+ * than 0.
+ */
+ double nvsegupperlimitpseg = -1.0;
+ double nvseglowerlimitpseg = -1.0;
+ if (dnvsegupperlimitpseg != NULL)
+ {
+ nvsegupperlimitpseg = defGetNumeric(dnvsegupperlimitpseg);
+ if (nvsegupperlimitpseg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT)));
+ }
+ }
+
+ if (dnvseglowerlimitpseg != NULL)
+ {
+ nvseglowerlimitpseg = defGetNumeric(dnvseglowerlimitpseg);
+ if (nvseglowerlimitpseg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s cannot be less than %s",
+ RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
+ MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT)));
+ }
+ }
+
+
/* The resource upper factor must be no less than 1. */
- if( dresupperfactor != NULL)
+ if( dresovercommit != NULL)
{
- double resupperfactor = defGetNumeric(dresupperfactor);
- if (resupperfactor < MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT_N)
+ double resovercommit = defGetNumeric(dresovercommit);
+ if (resovercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s cannot be less than %s",
- RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_UPPER_FACTOR],
- MINIMUM_RESQUEUE_UPPER_FACTOR_LIMIT)));
+ RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
+ MINIMUM_RESQUEUE_OVERCOMMIT)));
}
}
}