You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/06 19:44:59 UTC
[10/10] hive git commit: HIVE-17907 : enable and apply resource plan
commands in HS2 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-17907 : enable and apply resource plan commands in HS2 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ddce801f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ddce801f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ddce801f
Branch: refs/heads/master
Commit: ddce801f224e7a0f51d15ae810250eec9b85df95
Parents: a8912d6
Author: sergey <se...@apache.org>
Authored: Mon Nov 6 11:41:33 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Mon Nov 6 11:41:33 2017 -0800
----------------------------------------------------------------------
.../listener/DummyRawStoreFailEvent.java | 11 +-
.../hive/jdbc/TestTriggersWorkloadManager.java | 39 +-
.../upgrade/derby/046-HIVE-17566.derby.sql | 2 +-
.../upgrade/derby/hive-schema-3.0.0.derby.sql | 2 +-
.../upgrade/mssql/031-HIVE-17566.mssql.sql | 3 +-
.../upgrade/mssql/hive-schema-3.0.0.mssql.sql | 3 +-
.../upgrade/mysql/046-HIVE-17566.mysql.sql | 1 +
.../upgrade/mysql/hive-schema-3.0.0.mysql.sql | 1 +
.../upgrade/oracle/046-HIVE-17566.oracle.sql | 3 +-
.../upgrade/oracle/hive-schema-3.0.0.oracle.sql | 3 +-
.../postgres/045-HIVE-17566.postgres.sql | 3 +-
.../postgres/hive-schema-3.0.0.postgres.sql | 3 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 25 +-
.../hive/metastore/HiveMetaStoreClient.java | 12 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 7 +-
.../DummyRawStoreControlledCommit.java | 12 +-
.../DummyRawStoreForJdoConnection.java | 11 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 62 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 26 +-
.../hive/ql/exec/tez/WorkloadManager.java | 258 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 15 +-
.../hive/ql/parse/DDLSemanticAnalyzer.java | 91 +-
.../apache/hadoop/hive/ql/parse/HiveParser.g | 6 +-
.../hive/ql/plan/AlterResourcePlanDesc.java | 9 +
.../org/apache/hadoop/hive/ql/plan/DDLWork.java | 1 +
.../hive/ql/exec/tez/TestWorkloadManager.java | 138 +-
.../clientpositive/llap/resourceplan.q.out | 2 +-
.../apache/hive/service/server/HiveServer2.java | 66 +-
.../hive/metastore/annotation/package-info.java | 8 +
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2545 ++++++-----
.../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 134 +
.../ThriftHiveMetastore_server.skeleton.cpp | 5 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 1079 +++--
.../gen/thrift/gen-cpp/hive_metastore_types.h | 255 +-
.../hive/metastore/api/ThriftHiveMetastore.java | 4201 +++++++++++-------
.../api/WMAlterResourcePlanRequest.java | 113 +-
.../api/WMAlterResourcePlanResponse.java | 119 +-
.../metastore/api/WMAlterTriggerRequest.java | 2 +-
.../metastore/api/WMAlterTriggerResponse.java | 2 +-
.../api/WMCreateResourcePlanRequest.java | 2 +-
.../api/WMCreateResourcePlanResponse.java | 2 +-
.../metastore/api/WMCreateTriggerRequest.java | 2 +-
.../metastore/api/WMCreateTriggerResponse.java | 2 +-
.../api/WMDropResourcePlanRequest.java | 2 +-
.../api/WMDropResourcePlanResponse.java | 2 +-
.../metastore/api/WMDropTriggerRequest.java | 2 +-
.../metastore/api/WMDropTriggerResponse.java | 2 +-
.../hive/metastore/api/WMFullResourcePlan.java | 1033 +++++
.../api/WMGetActiveResourcePlanRequest.java | 283 ++
.../api/WMGetActiveResourcePlanResponse.java | 398 ++
.../api/WMGetAllResourcePlanRequest.java | 2 +-
.../api/WMGetAllResourcePlanResponse.java | 38 +-
.../metastore/api/WMGetResourcePlanRequest.java | 2 +-
.../api/WMGetResourcePlanResponse.java | 2 +-
.../api/WMGetTriggersForResourePlanRequest.java | 2 +-
.../WMGetTriggersForResourePlanResponse.java | 38 +-
.../hadoop/hive/metastore/api/WMMapping.java | 2 +-
.../hadoop/hive/metastore/api/WMPool.java | 258 +-
.../hive/metastore/api/WMPoolTrigger.java | 490 ++
.../hive/metastore/api/WMResourcePlan.java | 2 +-
.../hadoop/hive/metastore/api/WMTrigger.java | 2 +-
.../api/WMValidateResourcePlanRequest.java | 2 +-
.../api/WMValidateResourcePlanResponse.java | 2 +-
.../gen-php/metastore/ThriftHiveMetastore.php | 1553 ++++---
.../src/gen/thrift/gen-php/metastore/Types.php | 660 ++-
.../hive_metastore/ThriftHiveMetastore-remote | 7 +
.../hive_metastore/ThriftHiveMetastore.py | 1090 +++--
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 466 +-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 98 +-
.../gen/thrift/gen-rb/thrift_hive_metastore.rb | 61 +
.../hadoop/hive/metastore/ObjectStore.java | 152 +-
.../apache/hadoop/hive/metastore/RawStore.java | 7 +-
.../hive/metastore/cache/CachedStore.java | 12 +-
.../hadoop/hive/metastore/model/MWMPool.java | 12 +-
.../src/main/resources/package.jdo | 3 +
.../src/main/thrift/hive_metastore.thrift | 34 +-
76 files changed, 11218 insertions(+), 4787 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 5a627ce..996c005 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -18,6 +18,8 @@
package org.apache.hive.hcatalog.listener;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -995,13 +997,18 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
- public void alterResourcePlan(String name, WMResourcePlan resourcePlan)
+ public WMFullResourcePlan alterResourcePlan(String name, WMResourcePlan resourcePlan, boolean canActivateDisabled)
throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
MetaException {
- objectStore.alterResourcePlan(name, resourcePlan);
+ return objectStore.alterResourcePlan(name, resourcePlan, canActivateDisabled);
}
@Override
+ public WMFullResourcePlan getActiveResourcePlan() throws MetaException {
+ return objectStore.getActiveResourcePlan();
+ }
+
+ @Override
public boolean validateResourcePlan(String name)
throws NoSuchObjectException, InvalidObjectException, MetaException {
return objectStore.validateResourcePlan(name);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index 0ec7e85..012361a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -16,6 +16,10 @@
package org.apache.hive.jdbc;
+import org.slf4j.Logger;
+
+import org.slf4j.LoggerFactory;
+
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
@@ -26,17 +30,20 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
import org.junit.BeforeClass;
public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager {
+ private final static Logger LOG = LoggerFactory.getLogger(TestTriggersWorkloadManager.class);
@BeforeClass
public static void beforeTest() throws Exception {
@@ -74,10 +81,26 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
@Override
protected void setupTriggers(final List<Trigger> triggers) throws Exception {
WorkloadManager wm = WorkloadManager.getInstance();
- TmpResourcePlan rp = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool(
- "llap", null, 1, 1.0f, triggers)), Lists.newArrayList(
- new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 1)));
+ WMPool pool = new WMPool("rp", "llap");
+ pool.setAllocFraction(1.0f);
+ pool.setQueryParallelism(1);
+ WMMapping mapping = new WMMapping("rp", "DEFAULT", "");
+ mapping.setPoolName("llap");
+ WMFullResourcePlan rp = new WMFullResourcePlan(
+ new WMResourcePlan("rp"), Lists.newArrayList(pool));
+ rp.addToMappings(mapping);
+ for (Trigger trigger : triggers) {
+ rp.addToTriggers(wmTriggerFromTrigger(trigger));
+ rp.addToPoolTriggers(new WMPoolTrigger("llap", trigger.getName()));
+ }
wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
}
-
+
+ private WMTrigger wmTriggerFromTrigger(Trigger trigger) {
+ WMTrigger result = new WMTrigger("rp", trigger.getName());
+ result.setTriggerExpression(trigger.getExpression().toString()); // TODO: hmm
+ result.setActionExpression(trigger.getAction().toString()); // TODO: hmm
+ LOG.debug("Produced " + result + " from " + trigger);
+ return result;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql b/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
index 16190dc..f9b0765 100644
--- a/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
+++ b/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
@@ -2,7 +2,7 @@ CREATE TABLE "APP"."WM_RESOURCEPLAN" (RP_ID BIGINT NOT NULL, NAME VARCHAR(128) N
CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NAME");
ALTER TABLE "APP"."WM_RESOURCEPLAN" ADD CONSTRAINT "WM_RESOURCEPLAN_PK" PRIMARY KEY ("RP_ID");
-CREATE TABLE "APP"."WM_POOL" (POOL_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, PATH VARCHAR(1024) NOT NULL, PARENT_POOL_ID BIGINT, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER);
+CREATE TABLE "APP"."WM_POOL" (POOL_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, PATH VARCHAR(1024) NOT NULL, PARENT_POOL_ID BIGINT, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER, SCHEDULING_POLICY VARCHAR(1024));
CREATE UNIQUE INDEX "APP"."UNIQUE_WM_POOL" ON "APP"."WM_POOL" ("RP_ID", "PATH");
ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_PK" PRIMARY KEY ("POOL_ID");
ALTER TABLE "APP"."WM_POOL" ADD CONSTRAINT "WM_POOL_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
index 4c35380..054978e 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
@@ -112,7 +112,7 @@ CREATE TABLE "APP"."METASTORE_DB_PROPERTIES" ("PROPERTY_KEY" VARCHAR(255) NOT NU
CREATE TABLE "APP"."WM_RESOURCEPLAN" (RP_ID INTEGER NOT NULL, NAME VARCHAR(128) NOT NULL, QUERY_PARALLELISM INTEGER, STATUS VARCHAR(20) NOT NULL);
-CREATE TABLE "APP"."WM_POOL" (POOL_ID INTEGER NOT NULL, RP_ID INTEGER NOT NULL, PATH VARCHAR(1024) NOT NULL, PARENT_POOL_ID INTEGER, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER);
+CREATE TABLE "APP"."WM_POOL" (POOL_ID INTEGER NOT NULL, RP_ID INTEGER NOT NULL, PATH VARCHAR(1024) NOT NULL, PARENT_POOL_ID INTEGER, ALLOC_FRACTION DOUBLE, QUERY_PARALLELISM INTEGER, SCHEDULING_POLICY VARCHAR(1024));
CREATE TABLE "APP"."WM_TRIGGER" (TRIGGER_ID INTEGER NOT NULL, RP_ID INTEGER NOT NULL, NAME VARCHAR(128) NOT NULL, TRIGGER_EXPRESSION VARCHAR(1024), ACTION_EXPRESSION VARCHAR(1024));
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql b/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
index 0ee0f75..a13d976 100644
--- a/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
@@ -18,7 +18,8 @@ CREATE TABLE WM_POOL
PATH nvarchar(1024) NOT NULL,
PARENT_POOL_ID bigint,
ALLOC_FRACTION DOUBLE,
- QUERY_PARALLELISM int
+ QUERY_PARALLELISM int,
+ SCHEDULING_POLICY nvarchar(1024)
);
ALTER TABLE WM_POOL ADD CONSTRAINT WM_POOL_PK PRIMARY KEY (POOL_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
index 86f3e5c..1ceb723 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
@@ -614,7 +614,8 @@ CREATE TABLE WM_POOL
PATH nvarchar(1024) NOT NULL,
PARENT_POOL_ID bigint,
ALLOC_FRACTION DOUBLE,
- QUERY_PARALLELISM int
+ QUERY_PARALLELISM int,
+ SCHEDULING_POLICY nvarchar(1024)
);
ALTER TABLE WM_POOL ADD CONSTRAINT WM_POOL_PK PRIMARY KEY (POOL_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql b/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
index 1affa7a..03ee4cf 100644
--- a/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
@@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS WM_POOL
`PARENT_POOL_ID` bigint(20),
`ALLOC_FRACTION` DOUBLE,
`QUERY_PARALLELISM` int(11),
+ `SCHEDULING_POLICY` varchar(767),
PRIMARY KEY (`POOL_ID`),
KEY `UNIQUE_WM_POOL` (`RP_ID`, `PATH`),
CONSTRAINT `WM_POOL_FK1` FOREIGN KEY (`RP_ID`) REFERENCES `WM_RESOURCEPLAN` (`RP_ID`),
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
index 2453bb9..0664854 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
@@ -866,6 +866,7 @@ CREATE TABLE IF NOT EXISTS WM_POOL
`PARENT_POOL_ID` bigint(20),
`ALLOC_FRACTION` DOUBLE,
`QUERY_PARALLELISM` int(11),
+ `SCHEDULING_POLICY` varchar(767),
PRIMARY KEY (`POOL_ID`),
KEY `UNIQUE_WM_POOL` (`RP_ID`, `PATH`),
CONSTRAINT `WM_POOL_FK1` FOREIGN KEY (`RP_ID`) REFERENCES `WM_RESOURCEPLAN` (`RP_ID`),
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql b/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
index 0ee0f75..a13d976 100644
--- a/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
@@ -18,7 +18,8 @@ CREATE TABLE WM_POOL
PATH nvarchar(1024) NOT NULL,
PARENT_POOL_ID bigint,
ALLOC_FRACTION DOUBLE,
- QUERY_PARALLELISM int
+ QUERY_PARALLELISM int,
+ SCHEDULING_POLICY nvarchar(1024)
);
ALTER TABLE WM_POOL ADD CONSTRAINT WM_POOL_PK PRIMARY KEY (POOL_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
index 47bfc5a..a94985a 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
@@ -595,7 +595,8 @@ CREATE TABLE WM_POOL
PATH nvarchar(1024) NOT NULL,
PARENT_POOL_ID bigint,
ALLOC_FRACTION DOUBLE,
- QUERY_PARALLELISM int
+ QUERY_PARALLELISM int,
+ SCHEDULING_POLICY nvarchar(1024)
);
ALTER TABLE WM_POOL ADD CONSTRAINT WM_POOL_PK PRIMARY KEY (POOL_ID);
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql b/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
index aa27a64..e80e612 100644
--- a/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
@@ -18,7 +18,8 @@ CREATE TABLE "WM_POOL" (
"PATH" character varying(1024) NOT NULL,
"PARENT_POOL_ID" bigint,
"ALLOC_FRACTION" DOUBLE,
- "QUERY_PARALLELISM" integer
+ "QUERY_PARALLELISM" integer,
+ "SCHEDULING_POLICY" character varying(1024)
);
ALTER TABLE ONLY "WM_POOL"
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
index 5c770e2..be1bb1e 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
@@ -632,7 +632,8 @@ CREATE TABLE "WM_POOL" (
"PATH" character varying(1024) NOT NULL,
"PARENT_POOL_ID" bigint,
"ALLOC_FRACTION" DOUBLE,
- "QUERY_PARALLELISM" integer
+ "QUERY_PARALLELISM" integer,
+ "SCHEDULING_POLICY" character varying(1024)
);
CREATE TABLE "WM_TRIGGER" (
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 8a55305..0fcb93e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7414,8 +7414,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
public WMAlterResourcePlanResponse alter_resource_plan(WMAlterResourcePlanRequest request)
throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
try {
- getMS().alterResourcePlan(request.getResourcePlanName(), request.getResourcePlan());
- return new WMAlterResourcePlanResponse();
+ WMAlterResourcePlanResponse response = new WMAlterResourcePlanResponse();
+ // This method will only return full resource plan when activating one,
+ // to give the caller the result atomically with the activation.
+ WMFullResourcePlan fullPlanAfterAlter = getMS().alterResourcePlan(
+ request.getResourcePlanName(), request.getResourcePlan(),
+ request.isIsEnableAndActivate());
+ if (fullPlanAfterAlter != null) {
+ response.setFullResourcePlan(fullPlanAfterAlter);
+ }
+ return response;
} catch (MetaException e) {
LOG.error("Exception while trying to alter resource plan", e);
throw e;
@@ -7423,6 +7431,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
@Override
+ public WMGetActiveResourcePlanResponse get_active_resource_plan(
+ WMGetActiveResourcePlanRequest request) throws MetaException, TException {
+ try {
+ WMGetActiveResourcePlanResponse response = new WMGetActiveResourcePlanResponse();
+ response.setResourcePlan(getMS().getActiveResourcePlan());
+ return response;
+ } catch (MetaException e) {
+ LOG.error("Exception while trying to get active resource plan", e);
+ throw e;
+ }
+ }
+
+ @Override
public WMValidateResourcePlanResponse validate_resource_plan(WMValidateResourcePlanRequest request)
throws NoSuchObjectException, MetaException, TException {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index dfb0a6d..b5a9b79 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2668,12 +2668,20 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
- public void alterResourcePlan(String resourcePlanName, WMResourcePlan resourcePlan)
+ public WMFullResourcePlan alterResourcePlan(String resourcePlanName, WMResourcePlan resourcePlan,
+ boolean canActivateDisabled)
throws NoSuchObjectException, InvalidObjectException, MetaException, TException {
WMAlterResourcePlanRequest request = new WMAlterResourcePlanRequest();
request.setResourcePlanName(resourcePlanName);
request.setResourcePlan(resourcePlan);
- client.alter_resource_plan(request);
+ request.setIsEnableAndActivate(canActivateDisabled);
+ WMAlterResourcePlanResponse resp = client.alter_resource_plan(request);
+ return resp.isSetFullResourcePlan() ? resp.getFullResourcePlan() : null;
+ }
+
+ @Override
+ public WMFullResourcePlan getActiveResourcePlan() throws MetaException, TException {
+ return client.get_active_resource_plan(new WMGetActiveResourcePlanRequest()).getResourcePlan();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 5aa20c5..2cb255e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.metastore;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -1780,9 +1782,12 @@ public interface IMetaStoreClient {
void dropResourcePlan(String resourcePlanName)
throws NoSuchObjectException, MetaException, TException;
- void alterResourcePlan(String resourcePlanName, WMResourcePlan resourcePlan)
+ WMFullResourcePlan alterResourcePlan(String resourcePlanName, WMResourcePlan resourcePlan,
+ boolean canActivateDisabled)
throws NoSuchObjectException, InvalidObjectException, MetaException, TException;
+ WMFullResourcePlan getActiveResourcePlan() throws MetaException, TException;
+
boolean validateResourcePlan(String resourcePlanName)
throws NoSuchObjectException, InvalidObjectException, MetaException, TException;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 610b9fa..4df7c97 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.metastore;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -955,10 +957,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
- public void alterResourcePlan(String name, WMResourcePlan resourcePlan)
+ public WMFullResourcePlan alterResourcePlan(String name, WMResourcePlan resourcePlan,
+ boolean canActivateDisabled)
throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
MetaException {
- objectStore.alterResourcePlan(name, resourcePlan);
+ return objectStore.alterResourcePlan(name, resourcePlan, canActivateDisabled);
+ }
+
+ @Override
+ public WMFullResourcePlan getActiveResourcePlan() throws MetaException {
+ return objectStore.getActiveResourcePlan();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 84b70d8..a41e5a0 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.metastore;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -965,8 +967,15 @@ public class DummyRawStoreForJdoConnection implements RawStore {
}
@Override
- public void alterResourcePlan(String name, WMResourcePlan resourcePlan)
+ public WMFullResourcePlan alterResourcePlan(
+ String name, WMResourcePlan resourcePlan, boolean canActivateDisabled)
throws NoSuchObjectException, InvalidOperationException, MetaException {
+ return null;
+ }
+
+ @Override
+ public WMFullResourcePlan getActiveResourcePlan() throws MetaException {
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 591be49..826ae56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -21,6 +21,15 @@ package org.apache.hadoop.hive.ql.exec;
import static org.apache.commons.lang.StringUtils.join;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.FutureCallback;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
@@ -48,7 +57,6 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -86,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
@@ -102,6 +108,10 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
@@ -112,6 +122,7 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -271,10 +282,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stringtemplate.v4.ST;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
/**
* DDLTask implementation.
*
@@ -688,7 +695,6 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
WMResourcePlan resourcePlan = new WMResourcePlan();
-
if (desc.getNewName() != null) {
resourcePlan.setName(desc.getNewName());
} else {
@@ -699,12 +705,48 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
resourcePlan.setQueryParallelism(desc.getQueryParallelism());
}
+ boolean isActivate = false, isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
+ WorkloadManager wm = null;
if (desc.getStatus() != null) {
resourcePlan.setStatus(desc.getStatus());
+ isActivate = desc.getStatus() == WMResourcePlanStatus.ACTIVE;
+ if (isActivate) {
+ wm = WorkloadManager.getInstance();
+ if (wm == null && !isInTest) {
+ throw new HiveException("Resource plan can only be activated when WM is enabled");
+ }
+ }
}
- db.alterResourcePlan(desc.getRpName(), resourcePlan);
- return 0;
+ WMFullResourcePlan appliedRp = db.alterResourcePlan(
+ desc.getRpName(), resourcePlan, desc.isEnableActivate());
+ if (!isActivate || (wm == null && isInTest)) return 0;
+ assert wm != null;
+ if (appliedRp == null) {
+ throw new HiveException("Cannot get a resource plan to apply");
+ // TODO: shut down HS2?
+ }
+ final String name = (desc.getNewName() != null) ? desc.getNewName() : desc.getRpName();
+ LOG.info("Activating a new resource plan " + name + ": " + appliedRp);
+ // Note: as per our current constraints, the behavior of two parallel activates is
+ // undefined; although only one will succeed and the other will receive exception.
+ // We need proper (semi-)transactional modifications to support this without hacks.
+ ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp);
+ boolean isOk = false;
+ try {
+ // Note: we may add an async option in future. For now, let the task fail for the user.
+ future.get();
+ isOk = true;
+ LOG.info("Successfully activated resource plan " + name);
+ return 0;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HiveException(e);
+ } finally {
+ if (!isOk) {
+ LOG.error("Failed to activate resource plan " + name);
+ // TODO: shut down HS2?
+ }
+ }
}
private int dropResourcePlan(Hive db, DropResourcePlanDesc desc) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 96dc7d3..9954c24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
class UserPoolMapping {
+ public static enum MappingType {
+ USER, DEFAULT
+ }
+
private final static class Mapping {
public Mapping(String poolName, int priority) {
this.fullPoolName = poolName;
@@ -39,18 +43,18 @@ class UserPoolMapping {
private final Map<String, Mapping> userMappings = new HashMap<>();
private final String defaultPoolName;
- // TODO: add other types as needed
- public UserPoolMapping(List<TmpUserMapping> mappings) {
+ public UserPoolMapping(List<WMMapping> mappings) {
String defaultPoolName = null;
- for (TmpUserMapping mapping : mappings) {
- switch (mapping.getType()) {
+ for (WMMapping mapping : mappings) {
+ MappingType type = MappingType.valueOf(mapping.getEntityType().toUpperCase());
+ switch (type) {
case USER: {
- Mapping val = new Mapping(mapping.getPoolName(), mapping.getPriority());
- Mapping oldValue = userMappings.put(mapping.getName(), val);
+ Mapping val = new Mapping(mapping.getPoolName(), mapping.getOrdering());
+ Mapping oldValue = userMappings.put(mapping.getEntityName(), val);
if (oldValue != null) {
- throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; "
- + oldValue + " and " + val);
+ throw new AssertionError("Duplicate mapping for user " + mapping.getEntityName()
+ + "; " + oldValue + " and " + val);
}
break;
}
@@ -63,7 +67,7 @@ class UserPoolMapping {
defaultPoolName = poolName;
break;
}
- default: throw new AssertionError("Unknown type " + mapping.getType());
+ default: throw new AssertionError("Unknown type " + mapping.getEntityType());
}
}
this.defaultPoolName = defaultPoolName;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 8612bb6..169991c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,6 +17,22 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.hadoop.hive.ql.wm.ExpressionFactory;
+
+import org.apache.hadoop.hive.ql.wm.Trigger.Action;
+
+import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
+
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
+
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.hadoop.hive.metastore.api.WMPool;
+
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import org.apache.hadoop.hive.ql.session.SessionState;
import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +79,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
// TODO: this is a temporary setting that will go away, so it's not in HiveConf.
public static final String TEST_WM_CONFIG = "hive.test.workload.management";
- private static final char POOL_SEPARATOR = '/';
+ private static final char POOL_SEPARATOR = '.';
+ private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
private final HiveConf conf;
private final TezSessionPool<WmTezSession> tezAmPool;
@@ -109,9 +126,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
/** Used to schedule timeouts for some async operations. */
private final ScheduledExecutorService timeoutPool;
private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+ private ListenableFuture<Boolean> initRpFuture;
- @SuppressWarnings("rawtypes")
- private final FutureCallback FATAL_ERROR_CALLBACK = new FutureCallback() {
+ private static final FutureCallback<Object> FATAL_ERROR_CALLBACK = new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
}
@@ -126,9 +143,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// TODO: this is temporary before HiveServerEnvironment is merged.
private static volatile WorkloadManager INSTANCE;
public static WorkloadManager getInstance() {
- WorkloadManager wm = INSTANCE;
- assert wm != null;
- return wm;
+ return INSTANCE;
}
public static boolean isInUse(Configuration conf) {
@@ -136,7 +151,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
/** Called once, when HS2 initializes. */
- public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourcePlan plan) {
+ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) {
assert INSTANCE == null;
// We could derive the expected number of AMs to pass in.
LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, null, -1);
@@ -146,10 +161,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
@VisibleForTesting
WorkloadManager(String yarnQueue, HiveConf conf,
- QueryAllocationManager qam, TmpResourcePlan plan) {
+ QueryAllocationManager qam, WMFullResourcePlan plan) {
this.yarnQueue = yarnQueue;
this.conf = conf;
- this.totalQueryParallelism = applyInitialResourcePlan(plan);
+ this.totalQueryParallelism = determineQueryParallelism(plan);
+ this.initRpFuture = this.updateResourcePlanAsync(plan);
LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism");
this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
@@ -201,41 +217,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
startTriggerValidator(conf);
}
- // TODO: remove and let the thread handle it via normal ways?
- private int applyInitialResourcePlan(TmpResourcePlan plan) {
- int totalQueryParallelism = 0;
- // Note: we assume here that plan has been validated beforehand, so we don't verify
- // that fractions or query parallelism add up.
- this.userPoolMapping = new UserPoolMapping(plan.getMappings());
- assert pools == null;
- pools = new HashMap<>();
- // Use recursion to update parents more conveniently; we don't expect a big tree.
- for (TmpHivePool pool : plan.getRootPools()) {
- totalQueryParallelism += addInitialHivePool(pool, null);
- }
- return totalQueryParallelism;
- }
-
- // TODO: remove and let the thread handle it via normal ways?
- private int addInitialHivePool(TmpHivePool pool, PoolState parent) {
- String fullName = pool.getName();
- int totalQueryParallelism = pool.getQueryParallelism();
- double fraction = pool.getResourceFraction();
- if (parent != null) {
- fullName = parent.fullName + POOL_SEPARATOR + fullName;
- fraction = parent.finalFraction * pool.getResourceFraction();
- parent.finalFractionRemaining -= fraction;
- }
- PoolState state = new PoolState(fullName, totalQueryParallelism, fraction);
- if (pool.getChildren() != null) {
- for (TmpHivePool child : pool.getChildren()) {
- totalQueryParallelism += addInitialHivePool(child, state);
- }
+ private int determineQueryParallelism(WMFullResourcePlan plan) {
+ int result = 0;
+ for (WMPool pool : plan.getPools()) {
+ result += pool.getQueryParallelism();
}
- state.setTriggers(pool.triggers);
- LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
- pools.put(fullName, state);
- return totalQueryParallelism;
+ return result;
}
public void start() throws Exception {
@@ -245,6 +232,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
allocationManager.start();
wmThread.start();
+ initRpFuture.get(); // Wait for the initial resource plan to be applied.
}
public void stop() throws Exception {
@@ -277,7 +265,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
new IdentityHashMap<>();
private final LinkedList<GetRequest> getRequests = new LinkedList<>();
private final IdentityHashMap<WmTezSession, GetRequest> toReuse = new IdentityHashMap<>();
- private TmpResourcePlan resourcePlanToApply = null;
+ private WMFullResourcePlan resourcePlanToApply = null;
private boolean hasClusterStateChanged = false;
private SettableFuture<Boolean> testEvent, applyRpFuture;
}
@@ -606,13 +594,63 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings());
HashMap<String, PoolState> oldPools = pools;
pools = new HashMap<>();
- // Use recursion to update parents more conveniently; we don't expect a big tree.
- for (TmpHivePool pool : e.resourcePlanToApply.getRootPools()) {
- totalQueryParallelism += addHivePool(
- pool, null, oldPools, syncWork.toRestartInUse, poolsToRedistribute, e);
+
+ // For simplicity, to always have parents while storing pools in a flat structure, we'll
+ // first distribute them by levels, then add level by level.
+ ArrayList<List<WMPool>> poolsByLevel = new ArrayList<>();
+ for (WMPool pool : e.resourcePlanToApply.getPools()) {
+ String fullName = pool.getPoolPath();
+ int ix = StringUtils.countMatches(fullName, POOL_SEPARATOR_STR);
+ while (poolsByLevel.size() <= ix) {
+ poolsByLevel.add(new LinkedList<WMPool>()); // We expect all the levels to have items.
+ }
+ poolsByLevel.get(ix).add(pool);
+ }
+ for (int level = 0; level < poolsByLevel.size(); ++level) {
+ List<WMPool> poolsOnLevel = poolsByLevel.get(level);
+ for (WMPool pool : poolsOnLevel) {
+ String fullName = pool.getPoolPath();
+ int qp = pool.getQueryParallelism();
+ double fraction = pool.getAllocFraction();
+ if (level > 0) {
+ String parentName = fullName.substring(0, fullName.lastIndexOf(POOL_SEPARATOR));
+ PoolState parent = pools.get(parentName);
+ fraction = parent.finalFraction * fraction;
+ parent.finalFractionRemaining -= fraction;
+ }
+ PoolState state = oldPools == null ? null : oldPools.remove(fullName);
+ if (state == null) {
+ state = new PoolState(fullName, qp, fraction);
+ } else {
+ // This will also take care of the queries if query parallelism changed.
+ state.update(qp, fraction, syncWork.toRestartInUse, e);
+ poolsToRedistribute.add(fullName);
+ }
+ state.setTriggers(new LinkedList<Trigger>());
+ LOG.info("Adding Hive pool: " + state);
+ pools.put(fullName, state);
+ totalQueryParallelism += qp;
+ }
+ }
+ if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) {
+ Map<String, Trigger> triggers = new HashMap<>();
+ for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) {
+ // TODO: parse trigger.getActionExpression() correctly; right now the Action enum is invalid.
+ ExecutionTrigger execTrigger = new ExecutionTrigger(trigger.getTriggerName(),
+ ExpressionFactory.fromString(trigger.getTriggerExpression()), Action.KILL_QUERY);
+ triggers.put(trigger.getTriggerName(), execTrigger);
+ }
+ for (WMPoolTrigger poolTrigger : e.resourcePlanToApply.getPoolTriggers()) {
+ PoolState pool = pools.get(poolTrigger.getPool());
+ Trigger trigger = triggers.get(poolTrigger.getTrigger());
+ pool.triggers.add(trigger);
+ poolsToRedistribute.add(pool.fullName);
+ LOG.info("Adding pool " + pool.fullName + " trigger " + trigger);
+ }
}
+
if (oldPools != null && !oldPools.isEmpty()) {
- // Looks like some pools were removed; insert queued queries into the front of get reqs.
+ // Looks like some pools were removed; kill running queries, re-queue the queued ones.
for (PoolState oldPool : oldPools.values()) {
oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
}
@@ -762,39 +800,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
req.sessionToReuse = null;
}
- private int addHivePool(TmpHivePool pool, PoolState parent,
- HashMap<String, PoolState> oldPools, List<WmTezSession> toKill,
- HashSet<String> poolsToRedistribute, EventState e) {
- String fullName = pool.getName();
- int totalQueryParallelism = pool.getQueryParallelism();
- double fraction = pool.getResourceFraction();
- if (parent != null) {
- fullName = parent.fullName + POOL_SEPARATOR + fullName;
- fraction = parent.finalFraction * pool.getResourceFraction();
- parent.finalFractionRemaining -= fraction;
- }
- PoolState state = oldPools == null ? null : oldPools.remove(fullName);
- if (state == null) {
- state = new PoolState(fullName, totalQueryParallelism, fraction);
- } else {
- // This will also take care of the queries if query parallelism changed.
- state.update(totalQueryParallelism, fraction, toKill, e);
- poolsToRedistribute.add(fullName);
- }
- state.setTriggers(pool.triggers);
-
- if (pool.getChildren() != null) {
- for (TmpHivePool child : pool.getChildren()) {
- totalQueryParallelism += addHivePool(
- child, state, oldPools, toKill, poolsToRedistribute, e);
- }
- }
- LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
- pools.put(fullName, state);
- return totalQueryParallelism;
- }
-
-
/**
* Checks if the session is still relevant for WM and if yes, removes it from its thread.
* @return true if the session was removed; false if the session was already processed by WM
@@ -822,7 +827,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// ===== EVENT METHODS
- public Future<Boolean> updateResourcePlanAsync(TmpResourcePlan plan) {
+ public ListenableFuture<Boolean> updateResourcePlanAsync(WMFullResourcePlan plan) {
SettableFuture<Boolean> applyRpFuture = SettableFuture.create();
currentLock.lock();
try {
@@ -1246,9 +1251,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- @VisibleForTesting
- // will change in HIVE-17809
- public void setTriggers(final List<Trigger> triggers) {
+ public void setTriggers(final LinkedList<Trigger> triggers) {
this.triggers = triggers;
}
@@ -1431,89 +1434,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
-
-
- // TODO: temporary until real WM schema is created.
- public static class TmpHivePool {
- private final String name;
- private final List<TmpHivePool> children;
- private final int queryParallelism;
- private final double resourceFraction;
- private final List<Trigger> triggers;
-
- public TmpHivePool(String name,
- List<TmpHivePool> children, int queryParallelism, double resourceFraction) {
- this(name, children, queryParallelism, resourceFraction, new ArrayList<Trigger>());
- }
-
- public TmpHivePool(String name,
- List<TmpHivePool> children, int queryParallelism, double resourceFraction,
- List<Trigger> triggers) {
- this.name = name;
- this.children = children;
- this.queryParallelism = queryParallelism;
- this.resourceFraction = resourceFraction;
- this.triggers = triggers;
- }
-
- public String getName() {
- return name;
- }
- public List<TmpHivePool> getChildren() {
- return children;
- }
- public int getQueryParallelism() {
- return queryParallelism;
- }
- public double getResourceFraction() {
- return resourceFraction;
- }
- }
-
- public static enum TmpUserMappingType {
- USER, DEFAULT
- }
-
- public static class TmpUserMapping {
- private final TmpUserMappingType type;
- private final String name;
- private final String poolName;
- private final int priority;
- public TmpUserMapping(TmpUserMappingType type, String name, String poolName, int priority) {
- this.type = type;
- this.name = name;
- this.poolName = poolName;
- this.priority = priority;
- }
- public TmpUserMappingType getType() {
- return type;
- }
- public String getName() {
- return name;
- }
- public String getPoolName() {
- return poolName;
- }
- public int getPriority() {
- return priority;
- }
- }
-
- public static class TmpResourcePlan {
- private final List<TmpHivePool> rootPools;
- private final List<TmpUserMapping> mappings;
- public TmpResourcePlan(List<TmpHivePool> rootPools, List<TmpUserMapping> mappings) {
- this.rootPools = rootPools;
- this.mappings = mappings;
- }
- public List<TmpHivePool> getRootPools() {
- return rootPools;
- }
- public List<TmpUserMapping> getMappings() {
- return mappings;
- }
- }
-
@VisibleForTesting
TezSessionPool<WmTezSession> getTezAmPool() {
return tezAmPool;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 70656fe..cf4df9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -27,6 +27,8 @@ import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@@ -4734,9 +4736,18 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- public void alterResourcePlan(String rpName, WMResourcePlan resourcePlan) throws HiveException {
+ public WMFullResourcePlan alterResourcePlan(String rpName, WMResourcePlan resourcePlan,
+ boolean canActivateDisabled) throws HiveException {
+ try {
+ return getMSC().alterResourcePlan(rpName, resourcePlan, canActivateDisabled);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public WMFullResourcePlan getActiveResourcePlan() throws HiveException {
try {
- getMSC().alterResourcePlan(rpName, resourcePlan);
+ return getMSC().getActiveResourcePlan();
} catch (Exception e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 6a2ff75..3415a23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -880,49 +880,64 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private void analyzeAlterResourcePlan(ASTNode ast) throws SemanticException {
- if (ast.getChildCount() == 0) {
- throw new SemanticException("Expected name in ALTER RESOURCE PLAN statement");
- }
- String rpName = unescapeIdentifier(ast.getChild(0).getText());
if (ast.getChildCount() < 2) {
throw new SemanticException("Invalid syntax for ALTER RESOURCE PLAN statement");
}
- AlterResourcePlanDesc desc;
- switch (ast.getChild(1).getType()) {
- case HiveParser.TOK_VALIDATE:
- desc = AlterResourcePlanDesc.createValidatePlan(rpName);
- break;
- case HiveParser.TOK_ACTIVATE:
- desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.ACTIVE);
- break;
- case HiveParser.TOK_ENABLE:
- desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.ENABLED);
- break;
- case HiveParser.TOK_DISABLE:
- desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.DISABLED);
- break;
- case HiveParser.TOK_QUERY_PARALLELISM:
- if (ast.getChildCount() != 3) {
- throw new SemanticException(
- "Expected number for query parallelism in alter resource plan statment");
- }
- int queryParallelism = Integer.parseInt(ast.getChild(2).getText());
- desc = AlterResourcePlanDesc.createChangeParallelism(rpName, queryParallelism);
- break;
- case HiveParser.TOK_RENAME:
- if (ast.getChildCount() != 3) {
- throw new SemanticException(
- "Expected new name for rename in alter resource plan statment");
+ String rpName = unescapeIdentifier(ast.getChild(0).getText());
+ AlterResourcePlanDesc desc = null;
+ for (int i = 1; i < ast.getChildCount(); ++i) {
+ Tree child = ast.getChild(i);
+ switch (child.getType()) {
+ case HiveParser.TOK_VALIDATE:
+ desc = AlterResourcePlanDesc.createValidatePlan(rpName);
+ break;
+ case HiveParser.TOK_ACTIVATE:
+ if (desc == null) {
+ desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.ACTIVE);
+ } else if (desc.getStatus() == WMResourcePlanStatus.ENABLED) {
+ desc.setIsEnableActivate(true);
+ desc.setStatus(WMResourcePlanStatus.ACTIVE);
+ } else {
+ throw new SemanticException("Invalid ALTER ACTIVATE command");
+ }
+ break;
+ case HiveParser.TOK_ENABLE:
+ if (desc == null) {
+ desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.ENABLED);
+ } else if (desc.getStatus() == WMResourcePlanStatus.ACTIVE) {
+ desc.setIsEnableActivate(true);
+ } else {
+ throw new SemanticException("Invalid ALTER ENABLE command");
+ }
+ break;
+ case HiveParser.TOK_DISABLE:
+ if (desc != null) {
+ throw new SemanticException("Invalid ALTER DISABLE command");
+ }
+ desc = AlterResourcePlanDesc.createChangeStatus(rpName, WMResourcePlanStatus.DISABLED);
+ break;
+ case HiveParser.TOK_QUERY_PARALLELISM:
+ if (ast.getChildCount() <= (i + 1)) {
+ throw new SemanticException(
+ "Expected number for query parallelism in alter resource plan statment");
+ }
+ int queryParallelism = Integer.parseInt(ast.getChild(++i).getText());
+ desc = AlterResourcePlanDesc.createChangeParallelism(rpName, queryParallelism);
+ break;
+ case HiveParser.TOK_RENAME:
+ if (ast.getChildCount() <= (i + 1)) {
+ throw new SemanticException(
+ "Expected new name for rename in alter resource plan statment");
+ }
+ String name = ast.getChild(++i).getText();
+ desc = AlterResourcePlanDesc.createRenamePlan(rpName, name);
+ break;
+ default:
+ throw new SemanticException("Unexpected token in alter resource plan statement: "
+ + ast.getChild(1).getType());
}
- String name = ast.getChild(2).getText();
- desc = AlterResourcePlanDesc.createRenamePlan(rpName, name);
- break;
- default:
- throw new SemanticException("Unexpected token in alter resource plan statement: "
- + ast.getChild(1).getType());
}
- rootTasks.add(TaskFactory.get(
- new DDLWork(getInputs(), getOutputs(), desc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf));
}
private void analyzeDropResourcePlan(ASTNode ast) throws SemanticException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 306559c..8708f2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -1002,18 +1002,20 @@ createResourcePlanStatement
-> ^(TOK_CREATERESOURCEPLAN $name $parallelism?)
;
+activate : KW_ACTIVATE -> ^(TOK_ACTIVATE);
+enable : KW_ENABLE -> ^(TOK_ENABLE);
+
alterResourcePlanStatement
@init { pushMsg("alter resource plan statement", state); }
@after { popMsg(state); }
: KW_ALTER KW_RESOURCE KW_PLAN name=identifier (
(KW_VALIDATE -> ^(TOK_ALTER_RP $name TOK_VALIDATE))
- | (KW_ACTIVATE -> ^(TOK_ALTER_RP $name TOK_ACTIVATE))
- | (KW_ENABLE -> ^(TOK_ALTER_RP $name TOK_ENABLE))
| (KW_DISABLE -> ^(TOK_ALTER_RP $name TOK_DISABLE))
| (KW_SET KW_QUERY_PARALLELISM EQUAL parallelism=Number
-> ^(TOK_ALTER_RP $name TOK_QUERY_PARALLELISM $parallelism))
| (KW_RENAME KW_TO newName=identifier
-> ^(TOK_ALTER_RP $name TOK_RENAME $newName))
+ | ((activate+ enable? | enable+ activate?) -> ^(TOK_ALTER_RP $name activate? enable?))
)
;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterResourcePlanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterResourcePlanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterResourcePlanDesc.java
index f0658a6..04c9e44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterResourcePlanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterResourcePlanDesc.java
@@ -14,6 +14,7 @@ public class AlterResourcePlanDesc extends DDLDesc implements Serializable {
private Integer queryParallelism;
private WMResourcePlanStatus status;
private boolean validate;
+ private boolean isEnableActivate;
public AlterResourcePlanDesc() {}
@@ -88,4 +89,12 @@ public class AlterResourcePlanDesc extends DDLDesc implements Serializable {
public void setValidate(boolean validate) {
this.validate = validate;
}
+
+ public void setIsEnableActivate(boolean b) {
+ this.isEnableActivate = b;
+ }
+
+ public boolean isEnableActivate() {
+ return isEnableActivate;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
index 8152cfe..369f844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
public class DDLWork implements Serializable {
private static final long serialVersionUID = 1L;
+ // TODO: this can probably be replaced with much less code via dynamic dispatch and/or templates.
private PreInsertTableDesc preInsertTableDesc;
private InsertTableDesc insertTableDesc;
private CreateIndexDesc createIndexDesc;
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 94f42dd..0347e91 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -23,6 +23,14 @@ import static org.junit.Assert.*;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.*;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+
+import org.apache.hadoop.hive.metastore.api.WMPool;
+
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.collect.Lists;
@@ -34,10 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Test;
import org.slf4j.Logger;
@@ -102,6 +106,23 @@ public class TestWorkloadManager {
}
}
+ public static WMResourcePlan plan() {
+ return new WMResourcePlan("rp");
+ }
+
+ public static WMPool pool(String path, int qp, double alloc) {
+ WMPool pool = new WMPool("rp", path);
+ pool.setAllocFraction(alloc);
+ pool.setQueryParallelism(qp);
+ return pool;
+ }
+
+ public static WMMapping mapping(String user, String pool) {
+ WMMapping mapping = new WMMapping("rp", "USER", user);
+ mapping.setPoolName(pool);
+ return mapping;
+ }
+
public static class WorkloadManagerForTest extends WorkloadManager {
public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
@@ -110,14 +131,17 @@ public class TestWorkloadManager {
}
public WorkloadManagerForTest(String yarnQueue, HiveConf conf,
- QueryAllocationManager qam, TmpResourcePlan plan) {
+ QueryAllocationManager qam, WMFullResourcePlan plan) {
super(yarnQueue, conf, qam, plan);
}
- private static TmpResourcePlan createDummyPlan(int numSessions) {
- return new TmpResourcePlan(
- Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f)),
- Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0)));
+ private static WMFullResourcePlan createDummyPlan(int numSessions) {
+ WMMapping mapping = new WMMapping("rp", "DEFAULT", "");
+ mapping.setPoolName("llap");
+ WMFullResourcePlan plan = new WMFullResourcePlan();
+ plan.addToPools(pool("llap", numSessions, 1.0f));
+ plan.addToMappings(mapping);
+ return plan;
}
@Override
@@ -249,20 +273,15 @@ public class TestWorkloadManager {
qam.assertWasCalled();
}
- private static TmpUserMapping create(String user, String pool) {
- return new TmpUserMapping(TmpUserMappingType.USER, user, pool, 0);
- }
-
@Test(timeout = 10000)
public void testClusterFractions() throws Exception {
HiveConf conf = createConf();
MockQam qam = new MockQam();
- List<TmpHivePool> l2 = Lists.newArrayList(
- new TmpHivePool("p1", null, 1, 0.5f), new TmpHivePool("p2", null, 2, 0.3f));
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("r1", l2, 1, 0.6f), new TmpHivePool("r2", null, 1, 0.4f)),
- Lists.newArrayList(create("p1", "r1/p1"), create("p2", "r1/p2"), create("r1", "r1"),
- create("r2", "r2")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
+ Lists.newArrayList(pool("r1", 1, 0.6f), pool("r2", 1, 0.4f),
+ pool("r1.p1", 1, 0.5f), pool("r1.p2", 2, 0.3f)));
+ plan.setMappings(Lists.newArrayList(mapping("p1", "r1.p1"),
+ mapping("p2", "r1.p2"), mapping("r1", "r1"), mapping("r2", "r2")));
WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
assertEquals(5, wm.getNumSessions());
@@ -289,9 +308,9 @@ public class TestWorkloadManager {
public void testQueueing() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 2, 0.5f), new TmpHivePool("B", null, 2, 0.5f)),
- Lists.newArrayList(create("A", "A"), create("B", "B")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 2, 0.5f), pool("B", 2, 0.5f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
@@ -397,9 +416,9 @@ public class TestWorkloadManager {
public void testReuseWithDifferentPool() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 2, 0.6f), new TmpHivePool("B", null, 1, 0.4f)),
- Lists.newArrayList(create("A", "A"), create("B", "B")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 2, 0.6f), pool("B", 1, 0.4f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
@@ -427,9 +446,9 @@ public class TestWorkloadManager {
public void testApplyPlanUserMapping() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 0.5f), new TmpHivePool("B", null, 1, 0.5f)),
- Lists.newArrayList(create("U", "A")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 1, 0.5f), pool("B", 1, 0.5f)));
+ plan.setMappings(Lists.newArrayList(mapping("U", "A")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
@@ -446,9 +465,9 @@ public class TestWorkloadManager {
checkError(error);
// Now change the resource plan - change the mapping for the user.
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 0.6f), new TmpHivePool("B", null, 1, 0.4f)),
- Lists.newArrayList(create("U", "B")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 1, 0.6f), pool("B", 1, 0.4f)));
+ plan.setMappings(Lists.newArrayList(mapping("U", "B")));
wm.updateResourcePlanAsync(plan);
// The session will go to B with the new mapping; check it.
@@ -471,11 +490,11 @@ public class TestWorkloadManager {
public void testApplyPlanQpChanges() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 0.35f), new TmpHivePool("B", null, 2, 0.15f),
- new TmpHivePool("C", null, 2, 0.3f), new TmpHivePool("D", null, 1, 0.3f)),
- Lists.newArrayList(create("A", "A"), create("B", "B"),
- create("C", "C"), create("D", "D")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 1, 0.35f), pool("B", 2, 0.15f),
+ pool("C", 2, 0.3f), pool("D", 1, 0.3f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"),
+ mapping("C", "C"), mapping("D", "D")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
@@ -503,10 +522,10 @@ public class TestWorkloadManager {
// the queue, and the query queued in A will be re-queued in B and started.
// The fractions will also all change.
// Total: 4/4 running.
- plan = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool("B", null, 1, 0.3f),
- new TmpHivePool("C", null, 1, 0.2f), new TmpHivePool("D", null, 2, 0.5f)),
- Lists.newArrayList(create("A", "B"), create("B", "B"),
- create("C", "C"), create("D", "D")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("B", 1, 0.3f),
+ pool("C", 1, 0.2f), pool("D", 2, 0.5f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "B"), mapping("B", "B"),
+ mapping("C", "C"), mapping("D", "D")));
wm.updateResourcePlanAsync(plan);
wm.addTestEvent().get();
@@ -544,8 +563,9 @@ public class TestWorkloadManager {
public void testAmPoolInteractions() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
// Take away the only session, as if it was expiring.
@@ -565,8 +585,9 @@ public class TestWorkloadManager {
assertEquals("A", sessionA1.get().getPoolName());
// Increase qp, check that the pool grows.
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 4, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 4, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
WmTezSession oob2 = pool.getSession(),
oob3 = pool.getSession(),
@@ -575,8 +596,8 @@ public class TestWorkloadManager {
assertEquals(1, pool.getCurrentSize());
// Decrease qp, check that the pool shrinks incl. killing the unused and returned sessions.
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
wm.addTestEvent().get();
assertEquals(0, pool.getCurrentSize());
@@ -587,17 +608,17 @@ public class TestWorkloadManager {
assertEquals(1, pool.getCurrentSize());
// Decrease, then increase qp - sessions should not be killed on return.
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 2, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
oob2 = pool.getSession();
oob3 = pool.getSession();
assertEquals(0, pool.getCurrentSize());
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 2, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
wm.addTestEvent().get();
assertEquals(0, pool.getCurrentSize());
@@ -610,8 +631,9 @@ public class TestWorkloadManager {
public void testAsyncSessionInitFailures() throws Exception {
final HiveConf conf = createConf();
MockQam qam = new MockQam();
- TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
+ Lists.newArrayList(pool("A", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
@@ -634,8 +656,8 @@ public class TestWorkloadManager {
assertEquals(0, pool.getCurrentSize());
// Change the resource plan, so that the session gets killed.
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("B", null, 1, 1.0f)), Lists.newArrayList(create("A", "B")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("B", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "B")));
wm.updateResourcePlanAsync(plan);
wm.addTestEvent().get();
blockedWait.set(true); // Meanwhile, the init succeeds!
@@ -679,8 +701,8 @@ public class TestWorkloadManager {
// The session is taken out of the pool, but is waiting for registration.
assertEquals(0, pool.getCurrentSize());
- plan = new TmpResourcePlan(Lists.newArrayList(
- new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+ plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 1, 1.0f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A")));
wm.updateResourcePlanAsync(plan);
wm.addTestEvent().get();
failedWait.setException(new Exception("moo")); // Meanwhile, the init fails.
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/ql/src/test/results/clientpositive/llap/resourceplan.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/resourceplan.q.out b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
index 5cdfc9d..2f314a6 100644
--- a/ql/src/test/results/clientpositive/llap/resourceplan.q.out
+++ b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
@@ -3064,7 +3064,7 @@ plan_3 DISABLED 20
plan_2 DISABLED 10
PREHOOK: query: ALTER RESOURCE PLAN plan_3 ACTIVATE
PREHOOK: type: ALTER RESOURCEPLAN
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. InvalidOperationException(message:Cannot activate resource plan: plan_3 first enable it)
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. InvalidOperationException(message:Resource plan plan_3 is disabled and should be enabled before activation (or in the same command))
PREHOOK: query: SELECT * FROM SYS.WM_RESOURCEPLANS
PREHOOK: type: QUERY
PREHOOK: Input: sys@wm_resourceplans
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 6b99015..300ba72 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,12 +18,15 @@
package org.apache.hive.service.server;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+
+import org.apache.hadoop.hive.metastore.api.WMPool;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -185,23 +188,41 @@ public class HiveServer2 extends CompositeService {
LlapRegistryService.getClient(hiveConf);
}
+ Hive sessionHive = null;
+ try {
+ sessionHive = Hive.get(hiveConf);
+ } catch (HiveException e) {
+ throw new RuntimeException("Failed to get metastore connection", e);
+ }
+
// Initialize workload management.
String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
if (wmQueue != null && !wmQueue.isEmpty()) {
- wm = WorkloadManager.create(wmQueue, hiveConf, new TmpResourcePlan(
- Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f)),
- Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0))));
- } else {
- wm = null;
+ WMFullResourcePlan resourcePlan;
+ try {
+ resourcePlan = sessionHive.getActiveResourcePlan();
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ if (resourcePlan == null) {
+ if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
+ LOG.error("Cannot activate workload management - no active resource plan");
+ } else {
+ LOG.info("Creating a default resource plan for test");
+ resourcePlan = createTestResourcePlan();
+ }
+ }
+ if (resourcePlan != null) {
+ LOG.info("Initializing workload management");
+ wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
+ } else {
+ wm = null;
+ }
}
// Create views registry
- try {
- Hive sessionHive = Hive.get(hiveConf);
- HiveMaterializedViewsRegistry.get().init(sessionHive);
- } catch (HiveException e) {
- throw new RuntimeException("Failed to get metastore connection", e);
- }
+ HiveMaterializedViewsRegistry.get().init(sessionHive);
+
// Setup web UI
try {
int webUIPort =
@@ -274,6 +295,19 @@ public class HiveServer2 extends CompositeService {
});
}
+ private WMFullResourcePlan createTestResourcePlan() {
+ WMFullResourcePlan resourcePlan;
+ WMPool pool = new WMPool("testDefault", "llap");
+ pool.setAllocFraction(1f);
+ pool.setQueryParallelism(1);
+ resourcePlan = new WMFullResourcePlan(
+ new WMResourcePlan("testDefault"), Lists.newArrayList(pool));
+ WMMapping mapping = new WMMapping("testDefault", "DEFAULT", "");
+ mapping.setPoolName("llap");
+ resourcePlan.addToMappings(mapping);
+ return resourcePlan;
+ }
+
public static boolean isHTTPTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
if (transportMode == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ddce801f/standalone-metastore/src/gen/org/apache/hadoop/hive/metastore/annotation/package-info.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/org/apache/hadoop/hive/metastore/annotation/package-info.java b/standalone-metastore/src/gen/org/apache/hadoop/hive/metastore/annotation/package-info.java
new file mode 100644
index 0000000..0c7625d
--- /dev/null
+++ b/standalone-metastore/src/gen/org/apache/hadoop/hive/metastore/annotation/package-info.java
@@ -0,0 +1,8 @@
+/*
+ * Generated by saveVersion.sh
+ */
+@MetastoreVersionAnnotation(version="3.0.0-SNAPSHOT", shortVersion="3.0.0",
+ revision="56f5e5957ea48404ae119721709d05600b59eeb1", branch="master",
+ user="sergey", date="Fri Nov 3 11:25:46 PDT 2017", url="git://HW10895.hsd1.wa.comcast.net/Users/sergey/git/hivegit2",
+ srcChecksum="3ab291282f3dc11dd9e652008ea13851")
+package org.apache.hadoop.hive.metastore.annotation;