You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/14 21:28:59 UTC
hive git commit: HIVE-18002 : add group support for pool mappings
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master f7f635d6e -> d2958495a
HIVE-18002 : add group support for pool mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d2958495
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d2958495
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d2958495
Branch: refs/heads/master
Commit: d2958495ae59bdc3a91ed17155369af7fc5c7866
Parents: f7f635d
Author: sergey <se...@apache.org>
Authored: Tue Nov 14 13:05:29 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Nov 14 13:28:37 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 +-
.../hive/ql/exec/tez/UserPoolMapping.java | 73 +++++++---
.../hive/ql/exec/tez/TestWorkloadManager.java | 143 +++++++++++++------
3 files changed, 155 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 3bb4f58..8087b01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -159,8 +159,15 @@ public class TezTask extends Task<TezWork> {
LOG.warn("The session: " + session + " has not been opened");
}
Set<String> desiredCounters = new HashSet<>();
- session = WorkloadManagerFederation.getSession(session, conf,
- new MappingInput(ss.getUserName()), getWork().getLlapMode(), desiredCounters);
+ // We only need a username for UGI to use for groups; getGroups will fetch the groups
+ // based on Hadoop configuration, as documented at
+ // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
+ String userName = ss.getUserName();
+ MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
+ : new MappingInput(ss.getUserName(),
+ UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+ session = WorkloadManagerFederation.getSession(
+ session, conf, mi, getWork().getLlapMode(), desiredCounters);
TriggerContext triggerContext = ctx.getTriggerContext();
triggerContext.setDesiredCounters(desiredCounters);
http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 50cf4da..851245c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,17 +17,26 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.hadoop.hive.metastore.api.WMMapping;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spark_project.guava.collect.Lists;
class UserPoolMapping {
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(UserPoolMapping.class);
+
public static enum MappingType {
- USER, DEFAULT
+ USER, GROUP
}
+ private final Map<String, Mapping> userMappings = new HashMap<>(),
+ groupMappings = new HashMap<>();
+ private final String defaultPoolPath;
+
private final static class Mapping {
public Mapping(String poolName, int priority) {
this.fullPoolName = poolName;
@@ -45,35 +54,41 @@ class UserPoolMapping {
/** Contains all the information necessary to map a query to a pool. */
public static final class MappingInput {
- public final String userName;
- // TODO: we may add app name, group name, etc. later
+ private final String userName;
+ private final List<String> groups;
+ // TODO: we may add app name, etc. later
- public MappingInput(String userName) {
+ public MappingInput(String userName, List<String> groups) {
this.userName = userName;
+ this.groups = groups;
+ }
+
+ public List<String> getGroups() {
+ return groups == null ? Lists.<String>newArrayList() : groups;
+ }
+
+ private String getUserName() {
+ return userName;
}
@Override
public String toString() {
- return userName;
+ return getUserName() + "; groups " + groups;
}
}
- private final Map<String, Mapping> userMappings = new HashMap<>();
- private final String defaultPoolPath;
-
public UserPoolMapping(List<WMMapping> mappings, String defaultPoolPath) {
if (mappings != null) {
for (WMMapping mapping : mappings) {
MappingType type = MappingType.valueOf(mapping.getEntityType().toUpperCase());
switch (type) {
case USER: {
- Mapping val = new Mapping(mapping.getPoolName(), mapping.getOrdering());
- Mapping oldValue = userMappings.put(mapping.getEntityName(), val);
- if (oldValue != null) {
- throw new AssertionError("Duplicate mapping for user " + mapping.getEntityName()
- + "; " + oldValue + " and " + val);
- }
+ addMapping(mapping, userMappings, "user");
+ break;
+ }
+ case GROUP: {
+ addMapping(mapping, groupMappings, "group");
break;
}
default: throw new AssertionError("Unknown type " + type);
@@ -83,14 +98,26 @@ class UserPoolMapping {
this.defaultPoolPath = defaultPoolPath;
}
+ private static void addMapping(WMMapping mapping, Map<String, Mapping> map, String text) {
+ Mapping val = new Mapping(mapping.getPoolName(), mapping.getOrdering());
+ Mapping oldValue = map.put(mapping.getEntityName(), val);
+ if (oldValue != null) {
+ throw new AssertionError("Duplicate mapping for " + text + " " + mapping.getEntityName()
+ + "; " + oldValue + " and " + val);
+ }
+ }
+
public String mapSessionToPoolName(MappingInput input) {
- // For now, we only have user rules, so this is very simple.
- // In future we'd also look up groups (each groups the user is in initially; we may do it
- // the opposite way if the user is a member of many groups but there are not many rules),
- // whatever user supplies in connection string to HS2, etc.
- // If multiple rules match, we'd need to get the highest priority one.
- Mapping userMapping = userMappings.get(input.userName);
- if (userMapping != null) return userMapping.fullPoolName;
+ // For equal-priority rules, user rules come first because they are more specific (arbitrary).
+ Mapping mapping = userMappings.get(input.getUserName());
+ for (String group : input.getGroups()) {
+ Mapping candidate = groupMappings.get(group);
+ if (candidate == null) continue;
+ if (mapping == null || candidate.priority < mapping.priority) {
+ mapping = candidate;
+ }
+ }
+ if (mapping != null) return mapping.fullPoolName;
return defaultPoolPath;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index e220837..5ba6639 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import static org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -34,6 +33,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
import java.lang.Thread.State;
import java.util.List;
import java.util.Map;
@@ -41,7 +42,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -49,15 +49,13 @@ import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-
public class TestWorkloadManager {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class);
@@ -88,7 +86,7 @@ public class TestWorkloadManager {
cdl.countDown();
}
try {
- session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf));
+ session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf));
} catch (Throwable e) {
error.compareAndSet(null, e);
}
@@ -121,6 +119,10 @@ public class TestWorkloadManager {
return new WMResourcePlan("rp");
}
+ public static WMPool pool(String path) {
+ return pool(path, 4, 0.1f);
+ }
+
public static WMPool pool(String path, int qp, double alloc) {
WMPool pool = new WMPool("rp", path);
pool.setAllocFraction(alloc);
@@ -129,11 +131,20 @@ public class TestWorkloadManager {
}
public static WMMapping mapping(String user, String pool) {
- WMMapping mapping = new WMMapping("rp", "USER", user);
+ return mapping("USER", user, pool, 0);
+ }
+
+ public static WMMapping mapping(String type, String user, String pool, int ordering) {
+ WMMapping mapping = new WMMapping("rp", type, user);
mapping.setPoolName(pool);
+ mapping.setOrdering(ordering);
return mapping;
}
+ private List<String> groups(String... groups) {
+ return Lists.newArrayList(groups);
+ }
+
public static class WorkloadManagerForTest extends WorkloadManager {
public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
@@ -202,17 +213,17 @@ public class TestWorkloadManager {
TezSessionState nonPool = mock(TezSessionState.class);
when(nonPool.getConf()).thenReturn(conf);
doNothing().when(nonPool).close(anyBoolean());
- TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf);
+ TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf);
verify(nonPool).close(anyBoolean());
assertNotSame(nonPool, session);
session.returnToSessionManager();
TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
when(diffPool.getConf()).thenReturn(conf);
doNothing().when(diffPool).returnToSessionManager();
- session = wm.getSession(diffPool, new MappingInput("user"), conf);
+ session = wm.getSession(diffPool, new MappingInput("user", null), conf);
verify(diffPool).returnToSessionManager();
assertNotSame(diffPool, session);
- TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf);
+ TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf);
assertSame(session, session2);
}
@@ -224,11 +235,11 @@ public class TestWorkloadManager {
wm.start();
// The queue should be ignored.
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
- TezSessionState session = wm.getSession(null, new MappingInput("user"), conf);
+ TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf);
assertEquals("test", session.getQueueName());
assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
session.setQueueName("test2");
- session = wm.getSession(session, new MappingInput("user"), conf);
+ session = wm.getSession(session, new MappingInput("user", null), conf);
assertEquals("test", session.getQueueName());
}
@@ -243,7 +254,8 @@ public class TestWorkloadManager {
MockQam qam = new MockQam();
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
wm.start();
- WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session = (WmTezSession) wm.getSession(
+ null, new MappingInput("user", null), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalled();
WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
@@ -261,10 +273,10 @@ public class TestWorkloadManager {
MockQam qam = new MockQam();
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
- WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
assertEquals(1.0, session.getClusterFraction(), EPSILON);
qam.assertWasCalled();
- WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
assertEquals(0.5, session.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
qam.assertWasCalled();
@@ -275,7 +287,7 @@ public class TestWorkloadManager {
qam.assertWasCalled();
// We never lose pool session, so we should still be able to get.
- session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
session.returnToSessionManager();
assertEquals(1.0, session2.getClusterFraction(), EPSILON);
assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -295,16 +307,21 @@ public class TestWorkloadManager {
wm.start();
assertEquals(5, wm.getNumSessions());
// Get all the 5 sessions; validate cluster fractions.
- WmTezSession session05of06 = (WmTezSession) wm.getSession(null, new MappingInput("p1"), conf);
+ WmTezSession session05of06 = (WmTezSession) wm.getSession(
+ null, new MappingInput("p1", null), conf);
assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
- WmTezSession session03of06 = (WmTezSession) wm.getSession(null, new MappingInput("p2"), conf);
+ WmTezSession session03of06 = (WmTezSession) wm.getSession(
+ null, new MappingInput("p2", null), conf);
assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
- WmTezSession session03of06_2 = (WmTezSession) wm.getSession(null, new MappingInput("p2"), conf);
+ WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
+ null, new MappingInput("p2", null), conf);
assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
- WmTezSession session02of06 = (WmTezSession) wm.getSession(null,new MappingInput("r1"), conf);
+ WmTezSession session02of06 = (WmTezSession) wm.getSession(
+ null,new MappingInput("r1", null), conf);
assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
- WmTezSession session04 = (WmTezSession) wm.getSession(null, new MappingInput("r2"), conf);
+ WmTezSession session04 = (WmTezSession) wm.getSession(
+ null, new MappingInput("r2", null), conf);
assertEquals(0.4, session04.getClusterFraction(), EPSILON);
session05of06.returnToSessionManager();
session03of06.returnToSessionManager();
@@ -313,6 +330,34 @@ public class TestWorkloadManager {
session04.returnToSessionManager();
}
+ @Test(timeout = 10000)
+ public void testMappings() throws Exception {
+ HiveConf conf = createConf();
+ MockQam qam = new MockQam();
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
+ Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
+ plan.setMappings(Lists.newArrayList(mapping("USER", "u0", "u0", 0),
+ mapping("GROUP", "g0", "g0", 0), mapping("GROUP", "g1", "g1", 1),
+ mapping("USER", "u2", "u2", 2)));
+ WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+ // Test various combinations.
+ verifyMapping(wm, conf, new MappingInput("u0", groups("zzz")), "u0");
+ verifyMapping(wm, conf, new MappingInput("zzz", groups("g1")), "g1");
+ verifyMapping(wm, conf, new MappingInput("u0", groups("g1")), "u0");
+ // User takes precendence over groups unless ordered explicitly.
+ verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
+ verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
+ verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+ }
+
+ private static void verifyMapping(
+ WorkloadManager wm, HiveConf conf, MappingInput mi, String result) throws Exception {
+ WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf);
+ assertEquals(result, session.getPoolName());
+ session.returnToSessionManager();
+ }
+
@Test(timeout=10000)
public void testQueueing() throws Exception {
final HiveConf conf = createConf();
@@ -322,9 +367,9 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf);
final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
sessionA4 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -338,7 +383,7 @@ public class TestWorkloadManager {
assertNull(sessionA4.get());
checkError(error);
// While threads are blocked on A, we should still be able to get and return a B session.
- WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+ WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf);
sessionB1.returnToSessionManager();
sessionB2.returnToSessionManager();
assertNull(sessionA3.get());
@@ -364,16 +409,20 @@ public class TestWorkloadManager {
MockQam qam = new MockQam();
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
wm.start();
- WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session1 = (WmTezSession) wm.getSession(
+ null, new MappingInput("user", null), conf);
// First, try to reuse from the same pool - should "just work".
- WmTezSession session1a = (WmTezSession) wm.getSession(session1, new MappingInput("user"), conf);
+ WmTezSession session1a = (WmTezSession) wm.getSession(
+ session1, new MappingInput("user", null), conf);
assertSame(session1, session1a);
assertEquals(1.0, session1.getClusterFraction(), EPSILON);
// Should still be able to get the 2nd session.
- WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+ WmTezSession session2 = (WmTezSession) wm.getSession(
+ null, new MappingInput("user", null), conf);
// Now try to reuse with no other sessions remaining. Should still work.
- WmTezSession session2a = (WmTezSession) wm.getSession(session2, new MappingInput("user"), conf);
+ WmTezSession session2a = (WmTezSession) wm.getSession(
+ session2, new MappingInput("user", null), conf);
assertSame(session2, session2a);
assertEquals(0.5, session1.getClusterFraction(), EPSILON);
assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -430,19 +479,19 @@ public class TestWorkloadManager {
plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+ sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("A", sessionA2.getPoolName());
assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf);
assertSame(sessionA1, sessionB1);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A.
// Make sure that we can still get a session from A.
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
assertEquals("A", sessionA3.getPoolName());
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -462,7 +511,7 @@ public class TestWorkloadManager {
wm.start();
// One session will be running, the other will be queued in "A"
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf);
assertEquals("A", sessionA1.getPoolName());
assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -487,7 +536,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
// The new session will also go to B now.
sessionA2.get().returnToSessionManager();
- WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+ WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf);
assertEquals("B", sessionB1.getPoolName());
assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
sessionA1.returnToSessionManager();
@@ -511,11 +560,11 @@ public class TestWorkloadManager {
// A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
// Total: 5/6 running.
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
- sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
- sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
- sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf),
- sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+ sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf),
+ sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf),
+ sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf),
+ sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf);
final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
sessionD2 = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -651,7 +700,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
// [A: 1, B: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -675,7 +724,7 @@ public class TestWorkloadManager {
assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
// [A: 1, B: 1]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -702,7 +751,7 @@ public class TestWorkloadManager {
assertEquals("B", sessionA2.getPoolName());
assertEquals("B", sessionA1.getPoolName());
- WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
// [A: 1, B: 2]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -742,7 +791,7 @@ public class TestWorkloadManager {
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
- WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
// [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -800,7 +849,7 @@ public class TestWorkloadManager {
assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
assertEquals("B.x", sessionA1.getPoolName());
- WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
// [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
allSessionProviders = wm.getAllSessionTriggerProviders();
assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -899,7 +948,7 @@ public class TestWorkloadManager {
failedWait.setException(new Exception("foo"));
theOnlySession.setWaitForAmRegistryFuture(failedWait);
try {
- TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
+ TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf);
fail("Expected an error but got " + r);
} catch (Exception ex) {
// Expected.
@@ -950,7 +999,7 @@ public class TestWorkloadManager {
assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
pool.returnSession(theOnlySession);
// Make sure we can actually get a session still - parallelism/etc. should not be affected.
- WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+ WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
assertEquals(sessionPoolName, result.getPoolName());
assertEquals(1f, result.getClusterFraction(), EPSILON);
result.returnToSessionManager();