You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/14 21:28:59 UTC

hive git commit: HIVE-18002 : add group support for pool mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master f7f635d6e -> d2958495a


HIVE-18002 : add group support for pool mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d2958495
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d2958495
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d2958495

Branch: refs/heads/master
Commit: d2958495ae59bdc3a91ed17155369af7fc5c7866
Parents: f7f635d
Author: sergey <se...@apache.org>
Authored: Tue Nov 14 13:05:29 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Nov 14 13:28:37 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  11 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |  73 +++++++---
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 143 +++++++++++++------
 3 files changed, 155 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 3bb4f58..8087b01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -159,8 +159,15 @@ public class TezTask extends Task<TezWork> {
         LOG.warn("The session: " + session + " has not been opened");
       }
       Set<String> desiredCounters = new HashSet<>();
-      session = WorkloadManagerFederation.getSession(session, conf,
-          new MappingInput(ss.getUserName()), getWork().getLlapMode(), desiredCounters);
+      // We only need a username for UGI to use for groups; getGroups will fetch the groups
+      // based on Hadoop configuration, as documented at
+      // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
+      String userName = ss.getUserName();
+      MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
+        : new MappingInput(ss.getUserName(),
+            UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+      session = WorkloadManagerFederation.getSession(
+          session, conf, mi, getWork().getLlapMode(), desiredCounters);
 
       TriggerContext triggerContext = ctx.getTriggerContext();
       triggerContext.setDesiredCounters(desiredCounters);

http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 50cf4da..851245c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,17 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hadoop.hive.metastore.api.WMMapping;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spark_project.guava.collect.Lists;
 
 class UserPoolMapping {
+  @SuppressWarnings("unused")
+  private static final Logger LOG = LoggerFactory.getLogger(UserPoolMapping.class);
+
   public static enum MappingType {
-    USER, DEFAULT
+    USER, GROUP
   }
 
+  private final Map<String, Mapping> userMappings = new HashMap<>(),
+      groupMappings = new HashMap<>();
+  private final String defaultPoolPath;
+
   private final static class Mapping {
     public Mapping(String poolName, int priority) {
       this.fullPoolName = poolName;
@@ -45,35 +54,41 @@ class UserPoolMapping {
 
   /** Contains all the information necessary to map a query to a pool. */
   public static final class MappingInput {
-    public final String userName;
-    // TODO: we may add app name, group name, etc. later
+    private final String userName;
+    private final List<String> groups;
+    // TODO: we may add app name, etc. later
 
-    public MappingInput(String userName) {
+    public MappingInput(String userName, List<String> groups) {
       this.userName = userName;
+      this.groups = groups;
+    }
+
+    public List<String> getGroups() {
+      return groups == null ? Lists.<String>newArrayList() : groups;
+    }
+
+    private String getUserName() {
+      return userName;
     }
 
     @Override
     public String toString() {
-      return userName;
+      return getUserName() + "; groups " + groups;
     }
   }
 
 
-  private final Map<String, Mapping> userMappings = new HashMap<>();
-  private final String defaultPoolPath;
-
   public UserPoolMapping(List<WMMapping> mappings, String defaultPoolPath) {
     if (mappings != null) {
       for (WMMapping mapping : mappings) {
         MappingType type = MappingType.valueOf(mapping.getEntityType().toUpperCase());
         switch (type) {
         case USER: {
-          Mapping val = new Mapping(mapping.getPoolName(), mapping.getOrdering());
-          Mapping oldValue = userMappings.put(mapping.getEntityName(), val);
-          if (oldValue != null) {
-            throw new AssertionError("Duplicate mapping for user " + mapping.getEntityName()
-                + "; " + oldValue + " and " + val);
-          }
+          addMapping(mapping, userMappings, "user");
+          break;
+        }
+        case GROUP: {
+          addMapping(mapping, groupMappings, "group");
           break;
         }
         default: throw new AssertionError("Unknown type " + type);
@@ -83,14 +98,26 @@ class UserPoolMapping {
     this.defaultPoolPath = defaultPoolPath;
   }
 
+  private static void addMapping(WMMapping mapping, Map<String, Mapping> map, String text) {
+    Mapping val = new Mapping(mapping.getPoolName(), mapping.getOrdering());
+    Mapping oldValue = map.put(mapping.getEntityName(), val);
+    if (oldValue != null) {
+      throw new AssertionError("Duplicate mapping for " + text + " " + mapping.getEntityName()
+          + "; " + oldValue + " and " + val);
+    }
+  }
+
   public String mapSessionToPoolName(MappingInput input) {
-    // For now, we only have user rules, so this is very simple.
-    // In future we'd also look up groups (each groups the user is in initially; we may do it
-    // the opposite way if the user is a member of many groups but there are not many rules),
-    // whatever user supplies in connection string to HS2, etc.
-    // If multiple rules match, we'd need to get the highest priority one.
-    Mapping userMapping = userMappings.get(input.userName);
-    if (userMapping != null) return userMapping.fullPoolName;
+    // For equal-priority rules, user rules come first because they are more specific (arbitrary).
+    Mapping mapping = userMappings.get(input.getUserName());
+    for (String group : input.getGroups()) {
+      Mapping candidate = groupMappings.get(group);
+      if (candidate == null) continue;
+      if (mapping == null || candidate.priority < mapping.priority) {
+        mapping = candidate;
+      }
+    }
+    if (mapping != null) return mapping.fullPoolName;
     return defaultPoolPath;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d2958495/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index e220837..5ba6639 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
-import static org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -34,6 +33,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
 import java.lang.Thread.State;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +42,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -49,15 +49,13 @@ import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-
 public class TestWorkloadManager {
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class);
@@ -88,7 +86,7 @@ public class TestWorkloadManager {
         cdl.countDown();
       }
       try {
-       session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf));
+       session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf));
       } catch (Throwable e) {
         error.compareAndSet(null, e);
       }
@@ -121,6 +119,10 @@ public class TestWorkloadManager {
     return new WMResourcePlan("rp");
   }
 
+  public static WMPool pool(String path) {
+    return pool(path, 4, 0.1f);
+  }
+
   public static WMPool pool(String path, int qp, double alloc) {
     WMPool pool = new WMPool("rp", path);
     pool.setAllocFraction(alloc);
@@ -129,11 +131,20 @@ public class TestWorkloadManager {
   }
 
   public static WMMapping mapping(String user, String pool) {
-    WMMapping mapping = new WMMapping("rp", "USER", user);
+    return mapping("USER", user, pool, 0);
+  }
+
+  public static WMMapping mapping(String type, String user, String pool, int ordering) {
+    WMMapping mapping = new WMMapping("rp", type, user);
     mapping.setPoolName(pool);
+    mapping.setOrdering(ordering);
     return mapping;
   }
 
+  private List<String> groups(String... groups) {
+    return Lists.newArrayList(groups);
+  }
+
   public static class WorkloadManagerForTest extends WorkloadManager {
 
     public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
@@ -202,17 +213,17 @@ public class TestWorkloadManager {
     TezSessionState nonPool = mock(TezSessionState.class);
     when(nonPool.getConf()).thenReturn(conf);
     doNothing().when(nonPool).close(anyBoolean());
-    TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf);
+    TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf);
     verify(nonPool).close(anyBoolean());
     assertNotSame(nonPool, session);
     session.returnToSessionManager();
     TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
     when(diffPool.getConf()).thenReturn(conf);
     doNothing().when(diffPool).returnToSessionManager();
-    session = wm.getSession(diffPool, new MappingInput("user"), conf);
+    session = wm.getSession(diffPool, new MappingInput("user", null), conf);
     verify(diffPool).returnToSessionManager();
     assertNotSame(diffPool, session);
-    TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf);
+    TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf);
     assertSame(session, session2);
   }
 
@@ -224,11 +235,11 @@ public class TestWorkloadManager {
     wm.start();
     // The queue should be ignored.
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
-    TezSessionState session = wm.getSession(null, new MappingInput("user"), conf);
+    TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf);
     assertEquals("test", session.getQueueName());
     assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
     session.setQueueName("test2");
-    session = wm.getSession(session, new MappingInput("user"), conf);
+    session = wm.getSession(session, new MappingInput("user", null), conf);
     assertEquals("test", session.getQueueName());
   }
 
@@ -243,7 +254,8 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
     wm.start();
-    WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    WmTezSession session = (WmTezSession) wm.getSession(
+        null, new MappingInput("user", null), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalled();
     WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
@@ -261,10 +273,10 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
-    WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalled();
-    WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     assertEquals(0.5, session.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalled();
@@ -275,7 +287,7 @@ public class TestWorkloadManager {
     qam.assertWasCalled();
 
     // We never lose pool session, so we should still be able to get.
-    session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -295,16 +307,21 @@ public class TestWorkloadManager {
     wm.start();
     assertEquals(5, wm.getNumSessions());
     // Get all the 5 sessions; validate cluster fractions.
-    WmTezSession session05of06 = (WmTezSession) wm.getSession(null, new MappingInput("p1"), conf);
+    WmTezSession session05of06 = (WmTezSession) wm.getSession(
+        null, new MappingInput("p1", null), conf);
     assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
-    WmTezSession session03of06 = (WmTezSession) wm.getSession(null, new MappingInput("p2"), conf);
+    WmTezSession session03of06 = (WmTezSession) wm.getSession(
+        null, new MappingInput("p2", null), conf);
     assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
-    WmTezSession session03of06_2 = (WmTezSession) wm.getSession(null, new MappingInput("p2"), conf);
+    WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
+        null, new MappingInput("p2", null), conf);
     assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
     assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
-    WmTezSession session02of06 = (WmTezSession) wm.getSession(null,new MappingInput("r1"), conf);
+    WmTezSession session02of06 = (WmTezSession) wm.getSession(
+        null,new MappingInput("r1", null), conf);
     assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
-    WmTezSession session04 = (WmTezSession) wm.getSession(null, new MappingInput("r2"), conf);
+    WmTezSession session04 = (WmTezSession) wm.getSession(
+        null, new MappingInput("r2", null), conf);
     assertEquals(0.4, session04.getClusterFraction(), EPSILON);
     session05of06.returnToSessionManager();
     session03of06.returnToSessionManager();
@@ -313,6 +330,34 @@ public class TestWorkloadManager {
     session04.returnToSessionManager();
   }
 
+  @Test(timeout = 10000)
+  public void testMappings() throws Exception {
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
+        Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
+    plan.setMappings(Lists.newArrayList(mapping("USER", "u0", "u0", 0),
+        mapping("GROUP", "g0", "g0", 0), mapping("GROUP", "g1", "g1", 1),
+        mapping("USER", "u2", "u2", 2)));
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    // Test various combinations.
+    verifyMapping(wm, conf, new MappingInput("u0", groups("zzz")), "u0");
+    verifyMapping(wm, conf, new MappingInput("zzz", groups("g1")), "g1");
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g1")), "u0");
+    // User takes precendence over groups unless ordered explicitly.
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
+    verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
+    verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+  }
+
+  private static void verifyMapping(
+      WorkloadManager wm, HiveConf conf, MappingInput mi, String result) throws Exception {
+    WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf);
+    assertEquals(result, session.getPoolName());
+    session.returnToSessionManager();
+  }
+
   @Test(timeout=10000)
   public void testQueueing() throws Exception {
     final HiveConf conf = createConf();
@@ -322,9 +367,9 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf);
     final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
         sessionA4 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -338,7 +383,7 @@ public class TestWorkloadManager {
     assertNull(sessionA4.get());
     checkError(error);
     // While threads are blocked on A, we should still be able to get and return a B session.
-    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
+    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf);
     sessionB1.returnToSessionManager();
     sessionB2.returnToSessionManager();
     assertNull(sessionA3.get());
@@ -364,16 +409,20 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
-    WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    WmTezSession session1 = (WmTezSession) wm.getSession(
+        null, new MappingInput("user", null), conf);
     // First, try to reuse from the same pool - should "just work".
-    WmTezSession session1a = (WmTezSession) wm.getSession(session1, new MappingInput("user"), conf);
+    WmTezSession session1a = (WmTezSession) wm.getSession(
+        session1, new MappingInput("user", null), conf);
     assertSame(session1, session1a);
     assertEquals(1.0, session1.getClusterFraction(), EPSILON);
     // Should still be able to get the 2nd session.
-    WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
+    WmTezSession session2 = (WmTezSession) wm.getSession(
+        null, new MappingInput("user", null), conf);
 
     // Now try to reuse with no other sessions remaining. Should still work.
-    WmTezSession session2a = (WmTezSession) wm.getSession(session2, new MappingInput("user"), conf);
+    WmTezSession session2a = (WmTezSession) wm.getSession(
+        session2, new MappingInput("user", null), conf);
     assertSame(session2, session2a);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -430,19 +479,19 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("A", sessionA2.getPoolName());
     assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf);
     assertSame(sessionA1, sessionB1);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A.
     // Make sure that we can still get a session from A.
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     assertEquals("A", sessionA3.getPoolName());
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -462,7 +511,7 @@ public class TestWorkloadManager {
     wm.start();
  
     // One session will be running, the other will be queued in "A"
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -487,7 +536,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
     // The new session will also go to B now.
     sessionA2.get().returnToSessionManager();
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     sessionA1.returnToSessionManager();
@@ -511,11 +560,11 @@ public class TestWorkloadManager {
  
     // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
     // Total: 5/6 running.
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
-        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
-        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf),
-        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf),
+        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf),
+        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf),
+        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
         sessionD2 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -651,7 +700,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
 
     // [A: 1, B: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -675,7 +724,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     // [A: 1, B: 1]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -702,7 +751,7 @@ public class TestWorkloadManager {
     assertEquals("B", sessionA2.getPoolName());
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     // [A: 1, B: 2]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -742,7 +791,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
 
     // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -800,7 +849,7 @@ public class TestWorkloadManager {
     assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
     assertEquals("B.x", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -899,7 +948,7 @@ public class TestWorkloadManager {
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
     try {
-      TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
+      TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf);
       fail("Expected an error but got " + r);
     } catch (Exception ex) {
       // Expected.
@@ -950,7 +999,7 @@ public class TestWorkloadManager {
     assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
     pool.returnSession(theOnlySession);
     // Make sure we can actually get a session still - parallelism/etc. should not be affected.
-    WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+    WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf);
     assertEquals(sessionPoolName, result.getPoolName());
     assertEquals(1f, result.getClusterFraction(), EPSILON);
     result.returnToSessionManager();