You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 00:02:11 UTC
hive git commit: HIVE-18004 : investigate deriving app name from JDBC
connection for pool mapping (Sergey Shelukhin, reviewed by Harish Jaiprakash,
Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master d37b8026e -> c106f750e
HIVE-18004 : investigate deriving app name from JDBC connection for pool mapping (Sergey Shelukhin, reviewed by Harish Jaiprakash, Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c106f750
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c106f750
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c106f750
Branch: refs/heads/master
Commit: c106f750ec43839c4e9a033057f5e10f1a5c67f2
Parents: d37b802
Author: sergey <se...@apache.org>
Authored: Tue Jan 9 15:32:43 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Jan 9 16:02:07 2018 -0800
----------------------------------------------------------------------
.../org/apache/hive/jdbc/HiveConnection.java | 9 +-
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 7 +
.../upgrade/derby/046-HIVE-17566.derby.sql | 2 +-
.../upgrade/derby/hive-schema-3.0.0.derby.sql | 2 +-
.../upgrade/mssql/031-HIVE-17566.mssql.sql | 2 +-
.../upgrade/mssql/hive-schema-3.0.0.mssql.sql | 2 +-
.../upgrade/mysql/046-HIVE-17566.mysql.sql | 2 +-
.../upgrade/mysql/hive-schema-3.0.0.mysql.sql | 2 +-
.../upgrade/oracle/046-HIVE-17566.oracle.sql | 2 +-
.../upgrade/oracle/hive-schema-3.0.0.oracle.sql | 2 +-
.../postgres/045-HIVE-17566.postgres.sql | 2 +-
.../postgres/hive-schema-3.0.0.postgres.sql | 2 +-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 3 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 42 +++---
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 1 +
.../hadoop/hive/ql/parse/ResourcePlanParser.g | 6 +-
.../hive/ql/exec/tez/TestWorkloadManager.java | 149 +++++++++++--------
.../test/queries/clientpositive/resourceplan.q | 2 +
.../clientpositive/llap/resourceplan.q.out | 9 ++
.../org/apache/hive/service/cli/CLIService.java | 5 +
.../service/cli/EmbeddedCLIServiceClient.java | 5 +
.../apache/hive/service/cli/ICLIService.java | 2 +
.../hive/service/cli/session/HiveSession.java | 2 +
.../service/cli/session/HiveSessionImpl.java | 10 +-
.../thrift/RetryingThriftCLIServiceClient.java | 5 +
.../service/cli/thrift/ThriftCLIService.java | 14 +-
.../cli/thrift/ThriftCLIServiceClient.java | 14 ++
.../hadoop/hive/metastore/model/MWMMapping.java | 3 +-
.../src/main/resources/package.jdo | 2 +-
29 files changed, 205 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 45acf13..b5d289e 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -138,7 +138,7 @@ public class HiveConnection implements java.sql.Connection {
private TProtocolVersion protocol;
private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE;
private String initFile = null;
- private String wmPool = null;
+ private String wmPool = null, wmApp = null;
private Properties clientInfo;
/**
@@ -180,6 +180,10 @@ public class HiveConnection implements java.sql.Connection {
initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
}
wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
+ for (String application : JdbcConnectionParams.APPLICATION) {
+ wmApp = sessConfMap.get(application);
+ if (wmApp != null) break;
+ }
// add supported protocols
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
@@ -685,6 +689,9 @@ public class HiveConnection implements java.sql.Connection {
if (wmPool != null) {
openConf.put("set:hivevar:wmpool", wmPool);
}
+ if (wmApp != null) {
+ openConf.put("set:hivevar:wmapp", wmApp);
+ }
// set the session configuration
Map<String, String> sessVars = connParams.getSessionVars();
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index bb13682..1c1f644 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -125,6 +125,13 @@ public class Utils {
static final String INIT_FILE = "initFile";
static final String WM_POOL = "wmPool";
+ // We support ways to specify application name modeled after some existing DBs, since
+ // there's no standard approach.
+ // MSSQL: applicationName https://docs.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url
+ // Postgres 9~: ApplicationName https://jdbc.postgresql.org/documentation/91/connect.html
+ // Note: various ODBC names used include "Application Name", "APP", etc. Add those?
+ static final String[] APPLICATION = new String[] { "applicationName", "ApplicationName" };
+
// --------------- Begin 2 way ssl options -------------------------
// Use two way ssl. This param will take effect only when ssl=true
static final String USE_TWO_WAY_SSL = "twoWay";
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql b/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
index 9ff733d..7ee62a8 100644
--- a/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
+++ b/metastore/scripts/upgrade/derby/046-HIVE-17566.derby.sql
@@ -18,7 +18,7 @@ ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_PK" PR
ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK1" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK2" FOREIGN KEY ("TRIGGER_ID") REFERENCES "APP"."WM_TRIGGER" ("TRIGGER_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
-CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(10) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
+CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(128) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
CREATE UNIQUE INDEX "APP"."UNIQUE_WM_MAPPING" ON "APP"."WM_MAPPING" ("RP_ID", "ENTITY_TYPE", "ENTITY_NAME");
ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_PK" PRIMARY KEY ("MAPPING_ID");
ALTER TABLE "APP"."WM_MAPPING" ADD CONSTRAINT "WM_MAPPING_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
index ea75082..ce25c02 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
@@ -118,7 +118,7 @@ CREATE TABLE "APP"."WM_TRIGGER" (TRIGGER_ID BIGINT NOT NULL, RP_ID BIGINT NOT NU
CREATE TABLE "APP"."WM_POOL_TO_TRIGGER" (POOL_ID BIGINT NOT NULL, TRIGGER_ID BIGINT NOT NULL);
-CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(10) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
+CREATE TABLE "APP"."WM_MAPPING" (MAPPING_ID BIGINT NOT NULL, RP_ID BIGINT NOT NULL, ENTITY_TYPE VARCHAR(128) NOT NULL, ENTITY_NAME VARCHAR(128) NOT NULL, POOL_ID BIGINT, ORDERING INTEGER);
-- ----------------------------------------------
-- DML Statements
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql b/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
index 3fae382..3c5d593 100644
--- a/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/031-HIVE-17566.mssql.sql
@@ -64,7 +64,7 @@ CREATE TABLE WM_MAPPING
(
MAPPING_ID bigint NOT NULL,
RP_ID bigint NOT NULL,
- ENTITY_TYPE nvarchar(10) NOT NULL,
+ ENTITY_TYPE nvarchar(128) NOT NULL,
ENTITY_NAME nvarchar(128) NOT NULL,
POOL_ID bigint,
ORDERING int
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
index 402dfce..5ffcdbd 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-3.0.0.mssql.sql
@@ -644,7 +644,7 @@ CREATE TABLE WM_MAPPING
(
MAPPING_ID bigint NOT NULL,
RP_ID bigint NOT NULL,
- ENTITY_TYPE nvarchar(10) NOT NULL,
+ ENTITY_TYPE nvarchar(128) NOT NULL,
ENTITY_NAME nvarchar(128) NOT NULL,
POOL_ID bigint,
ORDERING int
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql b/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
index ed8b302..465de62 100644
--- a/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/046-HIVE-17566.mysql.sql
@@ -49,7 +49,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING
(
`MAPPING_ID` bigint(20) NOT NULL,
`RP_ID` bigint(20) NOT NULL,
- `ENTITY_TYPE` varchar(10) NOT NULL,
+ `ENTITY_TYPE` varchar(128) NOT NULL,
`ENTITY_NAME` varchar(128) NOT NULL,
`POOL_ID` bigint(20),
`ORDERING` int,
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
index f7312f0..dddf66a 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-3.0.0.mysql.sql
@@ -900,7 +900,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING
(
`MAPPING_ID` bigint(20) NOT NULL,
`RP_ID` bigint(20) NOT NULL,
- `ENTITY_TYPE` varchar(10) NOT NULL,
+ `ENTITY_TYPE` varchar(128) NOT NULL,
`ENTITY_NAME` varchar(128) NOT NULL,
`POOL_ID` bigint(20),
`ORDERING` int,
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql b/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
index 1b0ea94..6f782a1 100644
--- a/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/046-HIVE-17566.oracle.sql
@@ -63,7 +63,7 @@ CREATE TABLE WM_MAPPING
(
MAPPING_ID bigint NOT NULL,
RP_ID bigint NOT NULL,
- ENTITY_TYPE nvarchar(10) NOT NULL,
+ ENTITY_TYPE nvarchar(128) NOT NULL,
ENTITY_NAME nvarchar(128) NOT NULL,
POOL_ID bigint,
ORDERING int
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
index 1cfe768..98a1437 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-3.0.0.oracle.sql
@@ -625,7 +625,7 @@ CREATE TABLE WM_MAPPING
(
MAPPING_ID bigint NOT NULL,
RP_ID bigint NOT NULL,
- ENTITY_TYPE nvarchar(10) NOT NULL,
+ ENTITY_TYPE nvarchar(128) NOT NULL,
ENTITY_NAME nvarchar(128) NOT NULL,
POOL_ID bigint,
ORDERING int
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql b/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
index 99a1a25..c3846cf 100644
--- a/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/045-HIVE-17566.postgres.sql
@@ -71,7 +71,7 @@ ALTER TABLE ONLY "WM_POOL_TO_TRIGGER"
CREATE TABLE "WM_MAPPING" (
"MAPPING_ID" bigint NOT NULL,
"RP_ID" bigint NOT NULL,
- "ENTITY_TYPE" character varying(10) NOT NULL,
+ "ENTITY_TYPE" character varying(128) NOT NULL,
"ENTITY_NAME" character varying(128) NOT NULL,
"POOL_ID" bigint,
"ORDERING" integer
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
index 2c2d3a1..abe7243 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-3.0.0.postgres.sql
@@ -653,7 +653,7 @@ CREATE TABLE "WM_POOL_TO_TRIGGER" (
CREATE TABLE "WM_MAPPING" (
"MAPPING_ID" bigint NOT NULL,
"RP_ID" bigint NOT NULL,
- "ENTITY_TYPE" character varying(10) NOT NULL,
+ "ENTITY_TYPE" character varying(128) NOT NULL,
"ENTITY_NAME" character varying(128) NOT NULL,
"POOL_ID" bigint,
"ORDERING" integer
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 38f80fd..ab3a71e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -162,7 +162,8 @@ public class TezTask extends Task<TezWork> {
} else {
groups = UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups();
}
- MappingInput mi = new MappingInput(userName, groups, ss.getHiveVariables().get("wmpool"));
+ MappingInput mi = new MappingInput(userName, groups,
+ ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));
WmContext wmContext = ctx.getWmContext();
// jobConf will hold all the configuration for hadoop, tez, and hive
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 5919f3f..8f28b62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.annotations.VisibleForTesting;
-
import java.util.Set;
import java.util.HashMap;
import java.util.List;
@@ -33,11 +31,12 @@ class UserPoolMapping {
private static final Logger LOG = LoggerFactory.getLogger(UserPoolMapping.class);
public static enum MappingType {
- USER, GROUP
+ USER, GROUP, APPLICATION
}
+ // TODO# create commands for app mappings
private final Map<String, Mapping> userMappings = new HashMap<>(),
- groupMappings = new HashMap<>();
+ groupMappings = new HashMap<>(), appMappings = new HashMap<>();
private final String defaultPoolPath;
private final static class Mapping {
@@ -57,27 +56,17 @@ class UserPoolMapping {
/** Contains all the information necessary to map a query to a pool. */
public static final class MappingInput {
- private final String userName, wmPool;
+ private final String userName, wmPool, appName;
private final List<String> groups;
// TODO: we may add app name, etc. later
- public MappingInput(String userName, List<String> groups, String wmPool) {
+ public MappingInput(String userName, List<String> groups, String wmPool, String appName) {
this.userName = userName;
this.groups = groups;
+ this.appName = appName;
this.wmPool = wmPool;
}
- // TODO: move these into tests when there are fewer conflicting patches pending.
- @VisibleForTesting
- public MappingInput(String userName) {
- this(userName, null);
- }
-
- @VisibleForTesting
- public MappingInput(String userName, List<String> groups) {
- this(userName, groups, null);
- }
-
public List<String> getGroups() {
return groups == null ? Lists.<String>newArrayList() : groups;
}
@@ -88,7 +77,12 @@ class UserPoolMapping {
@Override
public String toString() {
- return "{" + getUserName() + "; pool " + wmPool + "; groups " + groups + "}";
+ return "{" + userName + "; app " + appName
+ + "; pool " + wmPool + "; groups " + groups + "}";
+ }
+
+ public String getAppName() {
+ return appName;
}
}
@@ -106,6 +100,10 @@ class UserPoolMapping {
addMapping(mapping, groupMappings, "group");
break;
}
+ case APPLICATION: {
+ addMapping(mapping, appMappings, "application");
+ break;
+ }
default: throw new AssertionError("Unknown type " + type);
}
}
@@ -126,13 +124,19 @@ class UserPoolMapping {
if (allowAnyPool && input.wmPool != null) {
return (pools == null || pools.contains(input.wmPool)) ? input.wmPool : null;
}
- // For equal-priority rules, user rules come first because they are more specific (arbitrary).
+ // For equal-priority rules, user rules come first because they are more specific; then apps,
+ // then groups (this is arbitrary).
Mapping mapping = userMappings.get(input.getUserName());
boolean isExplicitMatch = false;
if (mapping != null) {
isExplicitMatch = isExplicitPoolMatch(input, mapping);
if (isExplicitMatch) return mapping.fullPoolName;
}
+ // We don't check explicit pool match for apps; both are specified on the jdbc string
+ // so it doesn't make sense to have both and make sure one matches the other.
+ if (mapping == null && input.getAppName() != null) {
+ mapping = appMappings.get(input.getAppName());
+ }
for (String group : input.getGroups()) {
Mapping candidate = groupMappings.get(group);
if (candidate == null) continue;
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 01354af..ef2aa34 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -370,6 +370,7 @@ KW_WORKLOAD: 'WORKLOAD';
KW_MANAGEMENT: 'MANAGEMENT';
KW_ACTIVE: 'ACTIVE';
KW_UNMANAGED: 'UNMANAGED';
+KW_APPLICATION: 'APPLICATION';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/java/org/apache/hadoop/hive/ql/parse/ResourcePlanParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ResourcePlanParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/ResourcePlanParser.g
index 1b051f2..990ec52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ResourcePlanParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ResourcePlanParser.g
@@ -234,7 +234,7 @@ dropPoolStatement
createMappingStatement
@init { gParent.pushMsg("create mapping statement", state); }
@after { gParent.popMsg(state); }
- : (KW_CREATE mappingType=(KW_USER | KW_GROUP)
+ : (KW_CREATE mappingType=(KW_USER | KW_GROUP | KW_APPLICATION)
KW_MAPPING name=StringLiteral
KW_IN rpName=identifier KW_TO poolPath
(KW_WITH KW_ORDER order=Number)?)
@@ -244,7 +244,7 @@ createMappingStatement
alterMappingStatement
@init { gParent.pushMsg("alter mapping statement", state); }
@after { gParent.popMsg(state); }
- : (KW_ALTER mappingType=(KW_USER | KW_GROUP) KW_MAPPING
+ : (KW_ALTER mappingType=(KW_USER | KW_GROUP | KW_APPLICATION) KW_MAPPING
KW_MAPPING name=StringLiteral
KW_IN rpName=identifier KW_TO poolPath
(KW_WITH KW_ORDER order=Number)?)
@@ -254,7 +254,7 @@ alterMappingStatement
dropMappingStatement
@init { gParent.pushMsg("drop mapping statement", state); }
@after { gParent.popMsg(state); }
- : KW_DROP mappingType=(KW_USER | KW_GROUP) KW_MAPPING
+ : KW_DROP mappingType=(KW_USER | KW_GROUP | KW_APPLICATION) KW_MAPPING
name=StringLiteral KW_IN rpName=identifier
-> ^(TOK_DROP_MAPPING $rpName $mappingType $name)
;
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index e90e227..8d185ba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -32,8 +32,10 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
+
import java.lang.Thread.State;
import java.util.List;
import java.util.Map;
@@ -41,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -85,7 +88,7 @@ public class TestWorkloadManager {
cdl.countDown();
}
try {
- session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf));
+ session.set((WmTezSession) wm.getSession(old, mappingInput(userName), conf));
} catch (Throwable e) {
error.compareAndSet(null, e);
}
@@ -149,6 +152,18 @@ public class TestWorkloadManager {
return mapping;
}
+ public static MappingInput mappingInput(String userName) {
+ return new MappingInput(userName, null, null, null);
+ }
+
+ public static MappingInput mappingInput(String userName, List<String> groups) {
+ return new MappingInput(userName, groups, null, null);
+ }
+
+ public static MappingInput mappingInput(String userName, List<String> groups, String wmPool) {
+ return new MappingInput(userName, groups, wmPool, null);
+ }
+
private List<String> groups(String... groups) {
return Lists.newArrayList(groups);
}
@@ -242,17 +257,17 @@ public class TestWorkloadManager {
TezSessionState nonPool = mock(TezSessionState.class);
when(nonPool.getConf()).thenReturn(conf);
doNothing().when(nonPool).close(anyBoolean());
- TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf);
+ TezSessionState session = wm.getSession(nonPool, mappingInput("user"), conf);
verify(nonPool).close(anyBoolean());
assertNotSame(nonPool, session);
session.returnToSessionManager();
TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
when(diffPool.getConf()).thenReturn(conf);
doNothing().when(diffPool).returnToSessionManager();
- session = wm.getSession(diffPool, new MappingInput("user"), conf);
+ session = wm.getSession(diffPool, mappingInput("user"), conf);
verify(diffPool).returnToSessionManager();
assertNotSame(diffPool, session);
- TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf);
+ TezSessionState session2 = wm.getSession(session, mappingInput("user"), conf);
assertSame(session, session2);
}
@@ -264,11 +279,11 @@ public class TestWorkloadManager {
wm.start();
// The queue should be ignored.
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
- TezSessionState session = wm.getSession(null, new MappingInput("user"), conf);
+ TezSessionState session = wm.getSession(null, mappingInput("user"), conf);
assertEquals("test", session.getQueueName());
assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
session.setQueueName("test2");
- session = wm.getSession(session, new MappingInput("user"), conf);
+ session = wm.getSession(session, mappingInput("user"), conf);
assertEquals("test", session.getQueueName());
}
@@ -282,7 +297,7 @@ public class TestWorkloadManager {
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
wm.start();
WmTezSession session = (WmTezSession) wm.getSession(
- null, new MappingInput("user"), conf);
+ null, mappingInput("user"), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
WmTezSession session2 = (WmTezSession) session.reopen();
@@ -300,10 +315,10 @@ public class TestWorkloadManager {
MockQam qam = new MockQam();
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
- WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session = (WmTezSession) wm.getSession(null, mappingInput("user"), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
- WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session2 = (WmTezSession) wm.getSession(null, mappingInput("user"), conf);
assertEquals(0.5, session.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
@@ -314,7 +329,7 @@ public class TestWorkloadManager {
qam.assertWasCalledAndReset();
// We never lose pool session, so we should still be able to get.
- session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ session = (WmTezSession) wm.getSession(null, mappingInput("user"), conf);
session.returnToSessionManager();
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -335,20 +350,20 @@ public class TestWorkloadManager {
assertEquals(5, wm.getNumSessions());
// Get all the 5 sessions; validate cluster fractions.
WmTezSession session05of06 = (WmTezSession) wm.getSession(
- null, new MappingInput("p1"), conf);
+ null, mappingInput("p1"), conf);
assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
WmTezSession session03of06 = (WmTezSession) wm.getSession(
- null, new MappingInput("p2"), conf);
+ null, mappingInput("p2"), conf);
assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
- null, new MappingInput("p2"), conf);
+ null, mappingInput("p2"), conf);
assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
WmTezSession session02of06 = (WmTezSession) wm.getSession(
- null,new MappingInput("r1"), conf);
+ null,mappingInput("r1"), conf);
assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
WmTezSession session04 = (WmTezSession) wm.getSession(
- null, new MappingInput("r2"), conf);
+ null, mappingInput("r2"), conf);
assertEquals(0.4, session04.getClusterFraction(), EPSILON);
session05of06.returnToSessionManager();
session03of06.returnToSessionManager();
@@ -363,28 +378,30 @@ public class TestWorkloadManager {
conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "false");
MockQam qam = new MockQam();
WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
- Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
+ Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2"), pool("a0")));
plan.setMappings(Lists.newArrayList(mapping("USER", "u0", "u0", 0),
- mapping("GROUP", "g0", "g0", 0), mapping("GROUP", "g1", "g1", 1),
- mapping("USER", "u2", "u2", 2)));
+ mapping("APPLICATION", "a0", "a0", 0), mapping("GROUP", "g0", "g0", 0),
+ mapping("GROUP", "g1", "g1", 1), mapping("USER", "u2", "u2", 2)));
WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
// Test various combinations.
- verifyMapping(wm, conf, new MappingInput("u0", groups("zzz")), "u0");
- verifyMapping(wm, conf, new MappingInput("zzz", groups("g1")), "g1");
- verifyMapping(wm, conf, new MappingInput("u0", groups("g1")), "u0");
+ verifyMapping(wm, conf, mappingInput("u0", groups("zzz")), "u0");
+ verifyMapping(wm, conf, new MappingInput("u0", null, null, "a0"), "u0");
+ verifyMapping(wm, conf, new MappingInput("zzz", groups("g0"), null, "a0"), "a0");
+ verifyMapping(wm, conf, mappingInput("zzz", groups("g1")), "g1");
+ verifyMapping(wm, conf, mappingInput("u0", groups("g1")), "u0");
// User takes precendence over groups unless ordered explicitly.
- verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
- verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
- verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+ verifyMapping(wm, conf, mappingInput("u0", groups("g0")), "u0");
+ verifyMapping(wm, conf, mappingInput("u2", groups("g1")), "g1");
+ verifyMapping(wm, conf, mappingInput("u2", groups("g0", "g1")), "g0");
// Check explicit pool specifications - valid cases where priority is changed.
- verifyMapping(wm, conf, new MappingInput("u0", groups("g1"), "g1"), "g1");
- verifyMapping(wm, conf, new MappingInput("u2", groups("g1"), "u2"), "u2");
- verifyMapping(wm, conf, new MappingInput("zzz", groups("g0", "g1"), "g1"), "g1");
+ verifyMapping(wm, conf, mappingInput("u0", groups("g1"), "g1"), "g1");
+ verifyMapping(wm, conf, mappingInput("u2", groups("g1"), "u2"), "u2");
+ verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), "g1");
// Explicit pool specification - invalid - there's no mapping that matches.
try {
TezSessionState r = wm.getSession(
- null, new MappingInput("u0", groups("g0", "g1"), "u2"), conf);
+ null, mappingInput("u0", groups("g0", "g1"), "u2"), conf);
fail("Expected failure, but got " + r);
} catch (Exception ex) {
// Expected.
@@ -393,11 +410,11 @@ public class TestWorkloadManager {
conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "true");
wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- verifyMapping(wm, conf, new MappingInput("u0", groups("g0", "g1"), "u2"), "u2");
+ verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), "u2");
// The mapping that doesn't exist still shouldn't work.
try {
TezSessionState r = wm.getSession(
- null, new MappingInput("u0", groups("g0", "g1"), "zzz"), conf);
+ null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf);
fail("Expected failure, but got " + r);
} catch (Exception ex) {
// Expected.
@@ -423,9 +440,9 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, mappingInput("B"), conf);
final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
sessionA4 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -439,7 +456,7 @@ public class TestWorkloadManager {
assertNull(sessionA4.get());
checkError(error);
// While threads are blocked on A, we should still be able to get and return a B session.
- WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+ WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, mappingInput("B"), conf);
sessionB1.returnToSessionManager();
sessionB2.returnToSessionManager();
assertNull(sessionA3.get());
@@ -467,8 +484,8 @@ public class TestWorkloadManager {
plan.getPlan().setDefaultPoolPath("A");
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- session2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession session1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf),
+ session2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
assertEquals(0.5, session1.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
qam.assertWasCalledAndReset();
@@ -490,19 +507,19 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
WmTezSession session1 = (WmTezSession) wm.getSession(
- null, new MappingInput("user"), conf);
+ null, mappingInput("user"), conf);
// First, try to reuse from the same pool - should "just work".
WmTezSession session1a = (WmTezSession) wm.getSession(
- session1, new MappingInput("user"), conf);
+ session1, mappingInput("user"), conf);
assertSame(session1, session1a);
assertEquals(1.0, session1.getClusterFraction(), EPSILON);
// Should still be able to get the 2nd session.
WmTezSession session2 = (WmTezSession) wm.getSession(
- null, new MappingInput("user"), conf);
+ null, mappingInput("user"), conf);
// Now try to reuse with no other sessions remaining. Should still work.
WmTezSession session2a = (WmTezSession) wm.getSession(
- session2, new MappingInput("user"), conf);
+ session2, mappingInput("user"), conf);
assertSame(session2, session2a);
assertEquals(0.5, session1.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -559,19 +576,19 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("A", sessionA2.getPoolName());
assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, mappingInput("B"), conf);
assertSame(sessionA1, sessionB1);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A.
// Make sure that we can still get a session from A.
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
assertEquals("A", sessionA3.getPoolName());
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -591,7 +608,7 @@ public class TestWorkloadManager {
wm.start();
// One session will be running, the other will be queued in "A"
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("U"), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -616,7 +633,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
// The new session will also go to B now.
sessionA2.get().returnToSessionManager();
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, mappingInput("U"), conf);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
sessionA1.returnToSessionManager();
@@ -640,11 +657,11 @@ public class TestWorkloadManager {
// A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
// Total: 5/6 running.
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
- sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
- sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf),
- sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, mappingInput("B"), conf),
+ sessionB2 = (WmTezSession) wm.getSession(null, mappingInput("B"), conf),
+ sessionC1 = (WmTezSession) wm.getSession(null, mappingInput("C"), conf),
+ sessionD1 = (WmTezSession) wm.getSession(null, mappingInput("D"), conf);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
sessionD2 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -716,8 +733,8 @@ public class TestWorkloadManager {
// 2 running.
WmTezSession sessionA1 = (WmTezSession) wm.getSession(
- null, new MappingInput("A", null), conf, null),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ null, mappingInput("A", null), conf, null),
+ sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A", null), conf, null);
assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
assertEquals(0.5f, sessionA2.getClusterFraction(), EPSILON);
@@ -731,7 +748,7 @@ public class TestWorkloadManager {
// Add another session.
WmTezSession sessionA3 = (WmTezSession) wm.getSession(
- null, new MappingInput("A", null), conf, null);
+ null, mappingInput("A", null), conf, null);
assertEquals(0f, sessionA3.getClusterFraction(), EPSILON);
assertEquals("A", sessionA3.getPoolName());
@@ -743,7 +760,7 @@ public class TestWorkloadManager {
// Make sure reuse changes the FIFO order of the session.
WmTezSession sessionA4 = (WmTezSession) wm.getSession(
- sessionA2, new MappingInput("A", null), conf, null);
+ sessionA2, mappingInput("A", null), conf, null);
assertSame(sessionA2, sessionA4);
assertEquals(1f, sessionA3.getClusterFraction(), EPSILON);
assertEquals(0f, sessionA2.getClusterFraction(), EPSILON);
@@ -766,7 +783,7 @@ public class TestWorkloadManager {
// 1 running, 1 queued.
WmTezSession sessionA1 = (WmTezSession) wm.getSession(
- null, new MappingInput("A", null), conf, null);
+ null, mappingInput("A", null), conf, null);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
final CountDownLatch cdl1 = new CountDownLatch(1);
@@ -786,7 +803,7 @@ public class TestWorkloadManager {
assertEquals(0, tezAmPool.getCurrentSize());
try {
- TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null);
+ TezSessionState r = wm.getSession(null, mappingInput("A", null), conf, null);
fail("Expected an error but got " + r);
} catch (WorkloadManager.NoPoolMappingException ex) {
// Ignore, this particular error is expected.
@@ -794,7 +811,7 @@ public class TestWorkloadManager {
// Apply the plan again - enable WM.
wm.updateResourcePlanAsync(plan).get();
- sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+ sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A", null), conf, null);
assertEquals("A", sessionA1.getPoolName());
sessionA1.returnToSessionManager();
assertEquals(1, tezAmPool.getCurrentSize());
@@ -879,7 +896,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
// [A: 1, B: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -903,7 +920,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
// [A: 1, B: 1]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -930,7 +947,7 @@ public class TestWorkloadManager {
assertEquals("B", sessionA2.getPoolName());
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
// [A: 1, B: 2]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -970,7 +987,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
// [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -1028,7 +1045,7 @@ public class TestWorkloadManager {
assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
assertEquals("B.x", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
// [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -1127,7 +1144,7 @@ public class TestWorkloadManager {
SettableFuture<Boolean> failedWait = SettableFuture.create();
failedWait.setException(new Exception("foo"));
theOnlySession.setWaitForAmRegistryFuture(failedWait);
- TezSessionState retriedSession = wm.getSession(null, new MappingInput("A"), conf);
+ TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), conf);
assertNotNull(retriedSession);
assertNotSame(theOnlySession, retriedSession); // Should have been replaced.
retriedSession.returnToSessionManager();
@@ -1137,7 +1154,7 @@ public class TestWorkloadManager {
theOnlySession.setWaitForAmRegistryFuture(failedWait);
wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
try {
- TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
+ TezSessionState r = wm.getSession(null, mappingInput("A"), conf);
fail("Expected an error but got " + r);
} catch (Exception ex) {
// Expected.
@@ -1189,7 +1206,7 @@ public class TestWorkloadManager {
assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
pool.returnSession(theOnlySession);
// Make sure we can actually get a session still - parallelism/etc. should not be affected.
- WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession result = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
assertEquals(sessionPoolName, result.getPoolName());
assertEquals(1f, result.getClusterFraction(), EPSILON);
result.returnToSessionManager();
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/test/queries/clientpositive/resourceplan.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/resourceplan.q b/ql/src/test/queries/clientpositive/resourceplan.q
index ce684ed..9c4feb5 100644
--- a/ql/src/test/queries/clientpositive/resourceplan.q
+++ b/ql/src/test/queries/clientpositive/resourceplan.q
@@ -316,6 +316,7 @@ SELECT * FROM SYS.WM_POOLS_TO_TRIGGERS;
CREATE USER MAPPING "user1" IN plan_2 TO def;
CREATE USER MAPPING 'user2' IN plan_2 TO def WITH ORDER 1;
CREATE GROUP MAPPING "group1" IN plan_2 TO def.c1;
+CREATE APPLICATION MAPPING "app1" IN plan_2 TO def.c1;
CREATE GROUP MAPPING 'group2' IN plan_2 TO def.c2 WITH ORDER 1;
SELECT * FROM SYS.WM_MAPPINGS;
@@ -324,6 +325,7 @@ DROP POOL plan_2.def.c1;
DROP USER MAPPING "user2" in plan_2;
DROP GROUP MAPPING "group2" in plan_2;
+DROP APPLICATION MAPPING "app1" in plan_2;
SELECT * FROM SYS.WM_MAPPINGS;
CREATE RESOURCE PLAN plan_4;
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/ql/src/test/results/clientpositive/llap/resourceplan.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/resourceplan.q.out b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
index 29c01a8..e40994f 100644
--- a/ql/src/test/results/clientpositive/llap/resourceplan.q.out
+++ b/ql/src/test/results/clientpositive/llap/resourceplan.q.out
@@ -4036,6 +4036,10 @@ PREHOOK: query: CREATE GROUP MAPPING "group1" IN plan_2 TO def.c1
PREHOOK: type: CREATE MAPPING
POSTHOOK: query: CREATE GROUP MAPPING "group1" IN plan_2 TO def.c1
POSTHOOK: type: CREATE MAPPING
+PREHOOK: query: CREATE APPLICATION MAPPING "app1" IN plan_2 TO def.c1
+PREHOOK: type: CREATE MAPPING
+POSTHOOK: query: CREATE APPLICATION MAPPING "app1" IN plan_2 TO def.c1
+POSTHOOK: type: CREATE MAPPING
PREHOOK: query: CREATE GROUP MAPPING 'group2' IN plan_2 TO def.c2 WITH ORDER 1
PREHOOK: type: CREATE MAPPING
POSTHOOK: query: CREATE GROUP MAPPING 'group2' IN plan_2 TO def.c2 WITH ORDER 1
@@ -4048,6 +4052,7 @@ POSTHOOK: query: SELECT * FROM SYS.WM_MAPPINGS
POSTHOOK: type: QUERY
POSTHOOK: Input: sys@wm_mappings
#### A masked pattern was here ####
+plan_2 APPLICATION app1 def.c1 0
plan_2 GROUP group1 def.c1 0
plan_2 GROUP group2 def.c2 1
plan_2 USER user1 def 0
@@ -4063,6 +4068,10 @@ PREHOOK: query: DROP GROUP MAPPING "group2" in plan_2
PREHOOK: type: DROP MAPPING
POSTHOOK: query: DROP GROUP MAPPING "group2" in plan_2
POSTHOOK: type: DROP MAPPING
+PREHOOK: query: DROP APPLICATION MAPPING "app1" in plan_2
+PREHOOK: type: DROP MAPPING
+POSTHOOK: query: DROP APPLICATION MAPPING "app1" in plan_2
+POSTHOOK: type: DROP MAPPING
PREHOOK: query: SELECT * FROM SYS.WM_MAPPINGS
PREHOOK: type: QUERY
PREHOOK: Input: sys@wm_mappings
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index 9b13ea7..82e2172 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -593,6 +593,11 @@ public class CLIService extends CompositeService implements ICLIService {
}
@Override
+ public void setApplicationName(SessionHandle sh, String value) throws HiveSQLException {
+ sessionManager.getSession(sh).setApplicationName(value);
+ }
+
+ @Override
public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
String tokenStr) throws HiveSQLException {
sessionManager.getSession(sessionHandle).
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
index 98125d3..056072f 100644
--- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
@@ -232,4 +232,9 @@ public class EmbeddedCLIServiceClient extends CLIServiceClient {
public String getQueryId(TOperationHandle operationHandle) throws HiveSQLException {
return cliService.getQueryId(operationHandle);
}
+
+ @Override
+ public void setApplicationName(SessionHandle sh, String value) throws HiveSQLException {
+ cliService.setApplicationName(sh, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/ICLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java
index 8c993a5..4f3cc91 100644
--- a/service/src/java/org/apache/hive/service/cli/ICLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java
@@ -112,4 +112,6 @@ public interface ICLIService {
String primaryCatalog, String primarySchema, String primaryTable,
String foreignCatalog, String foreignSchema, String foreignTable)
throws HiveSQLException;
+
+ void setApplicationName(SessionHandle sh, String value) throws HiveSQLException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index bd4d90d..08006c8 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -222,4 +222,6 @@ public interface HiveSession extends HiveSessionBase {
long getNoOperationTime();
Future<?> submitBackgroundOperation(Runnable work);
+
+ void setApplicationName(String value);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 7fbcd13..b789e89 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -19,7 +19,6 @@
package org.apache.hive.service.cli.session;
import java.util.Collections;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -33,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
@@ -1014,6 +1014,14 @@ public class HiveSessionImpl implements HiveSession {
}
}
+ @Override
+ public void setApplicationName(String value) {
+ String oldName = sessionState.getHiveVariables().put("wmapp", value);
+ if (oldName != null && !oldName.equals(value)) {
+ LOG.info("ApplicationName changed from " + oldName + " to " + value);
+ }
+ }
+
// From https://docs.microsoft.com/en-us/sql/t-sql/language-elements/reserved-keywords-transact-sql#odbc-reserved-keywords
private static final Set<String> ODBC_KEYWORDS = Collections.unmodifiableSet(new HashSet<>(
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index 71e53b7..2d17ca5 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -240,6 +240,11 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler {
return cliService.getCrossReference(sessionHandle, primaryCatalog, primarySchema,
primaryTable, foreignCatalog, foreignSchema, foreignTable);
}
+
+ @Override
+ public void setApplicationName(SessionHandle sh, String value) throws HiveSQLException {
+ cliService.setApplicationName(sh, value);
+ }
}
protected RetryingThriftCLIServiceClient(HiveConf conf) {
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index eef7a25..87e28d1 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -349,22 +349,32 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
public TSetClientInfoResp SetClientInfo(TSetClientInfoReq req) throws TException {
// TODO: We don't do anything for now, just log this for debugging.
// We may be able to make use of this later, e.g. for workload management.
+ TSetClientInfoResp resp = null;
if (req.isSetConfiguration()) {
StringBuilder sb = null;
+ SessionHandle sh = null;
for (Map.Entry<String, String> e : req.getConfiguration().entrySet()) {
if (sb == null) {
- SessionHandle sh = new SessionHandle(req.getSessionHandle());
+ sh = new SessionHandle(req.getSessionHandle());
sb = new StringBuilder("Client information for ").append(sh).append(": ");
} else {
sb.append(", ");
}
sb.append(e.getKey()).append(" = ").append(e.getValue());
+ if ("ApplicationName".equals(e.getKey())) {
+ try {
+ cliService.setApplicationName(sh, e.getValue());
+ } catch (Exception ex) {
+ LOG.warn("Error setting application name", ex);
+ resp = new TSetClientInfoResp(HiveSQLException.toTStatus(ex));
+ }
+ }
}
if (sb != null) {
LOG.info("{}", sb);
}
}
- return new TSetClientInfoResp(OK_STATUS);
+ return resp == null ? new TSetClientInfoResp(OK_STATUS) : resp;
}
private String getIpAddress() {
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index d43f125..4e46863 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -82,6 +82,8 @@ import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
+import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
import org.apache.hive.service.rpc.thrift.TStatus;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.thrift.TException;
@@ -564,4 +566,16 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
throw new HiveSQLException(e);
}
}
+
+ @Override
+ public void setApplicationName(SessionHandle sh, String value) throws HiveSQLException {
+ try {
+ TSetClientInfoReq req = new TSetClientInfoReq(sh.toTSessionHandle());
+ req.putToConfiguration("ApplicationName", value);
+ TSetClientInfoResp resp = cliService.SetClientInfo(req);
+ checkStatus(resp.getStatus());
+ } catch (TException e) {
+ throw new HiveSQLException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MWMMapping.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MWMMapping.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MWMMapping.java
index ec0ac3f..276ed9a 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MWMMapping.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MWMMapping.java
@@ -27,7 +27,8 @@ public class MWMMapping {
public enum EntityType {
USER,
- GROUP
+ GROUP,
+ APPLICATION
}
public MWMMapping(MWMResourcePlan resourcePlan, EntityType entityType, String entityName,
http://git-wip-us.apache.org/repos/asf/hive/blob/c106f750/standalone-metastore/src/main/resources/package.jdo
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo
index e3ae67d..331e226 100644
--- a/standalone-metastore/src/main/resources/package.jdo
+++ b/standalone-metastore/src/main/resources/package.jdo
@@ -1183,7 +1183,7 @@
<column name="RP_ID" jdbc-type="integer" allows-null="false"/>
</field>
<field name="entityType">
- <column name="ENTITY_TYPE" jdbc-type="string" length="10" />
+ <column name="ENTITY_TYPE" jdbc-type="string" length="128" />
</field>
<field name="entityName">
<column name="ENTITY_NAME" jdbc-type="string" length="128" />