You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2020/10/30 17:01:02 UTC

[hadoop] branch trunk updated: YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)

This is an automated email from the ASF dual-hosted git repository.

wangda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c47c9fd  YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)
c47c9fd is described below

commit c47c9fd65d835fc53c9eef59a05b97335b04e320
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Fri Oct 30 09:49:48 2020 -0700

    YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)
    
    Change-Id: I518dc925187ce55e9d35a37ba20878c0f4e37e5c
---
 .../yarn/server/resourcemanager/RMAppManager.java  |  4 +
 .../scheduler/capacity/CapacityScheduler.java      | 20 +++++
 .../server/resourcemanager/TestAppManager.java     | 94 +++++++++++++++++++++-
 .../TestCapacitySchedulerAutoQueueCreation.java    | 64 ++++++++++++++-
 .../capacity/TestCapacitySchedulerQueueACLs.java   | 42 ++++++++++
 5 files changed, 219 insertions(+), 5 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 1e62f25..fe18d82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -927,6 +927,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         return usernameUsedForPlacement;
       }
       String queue = appPlacementContext.getQueue();
+      String parent = appPlacementContext.getParentQueue();
+      if (scheduler instanceof CapacityScheduler && parent != null) {
+        queue = parent + "." + queue;
+      }
       if (callerUGI != null && scheduler
               .checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
         usernameUsedForPlacement = userNameFromAppTag;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a632bfa..259cd5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2290,6 +2290,21 @@ public class CapacityScheduler extends
   public boolean checkAccess(UserGroupInformation callerUGI,
       QueueACL acl, String queueName) {
     CSQueue queue = getQueue(queueName);
+
+    if (queueName.startsWith("root.")) {
+      // can only check proper ACLs if the path is fully qualified
+      while (queue == null) {
+        int sepIndex = queueName.lastIndexOf(".");
+        String parentName = queueName.substring(0, sepIndex);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Queue {} does not exist, checking parent {}",
+              queueName, parentName);
+        }
+        queueName = parentName;
+        queue = queueManager.getQueue(queueName);
+      }
+    }
+
     if (queue == null) {
       LOG.debug("ACL not found for queue access-type {} for queue {}",
           acl, queueName);
@@ -3307,4 +3322,9 @@ public class CapacityScheduler extends
   public void setActivitiesManagerEnabled(boolean enabled) {
     this.activitiesManagerEnabled = enabled;
   }
+
+  @VisibleForTesting
+  public void setQueueManager(CapacitySchedulerQueueManager qm) {
+    this.queueManager = qm;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index bc89917..e8b4105 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -84,11 +84,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -102,6 +108,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -120,12 +127,16 @@ import static org.mockito.Mockito.when;
  */
 
 public class TestAppManager extends AppManagerTestBase{
+  @Rule
+  public UseCapacitySchedulerRule shouldUseCs = new UseCapacitySchedulerRule();
+
   private static final Logger LOG =
       LoggerFactory.getLogger(TestAppManager.class);
   private static RMAppEventType appEventType = RMAppEventType.KILL;
 
   private static String USER = "user_";
   private static String USER0 = USER + 0;
+  private ResourceScheduler scheduler;
 
   private static final String USER_ID_PREFIX = "userid=";
 
@@ -227,7 +238,13 @@ public class TestAppManager extends AppManagerTestBase{
     rmContext = mockRMContext(1, now - 10);
     rmContext
         .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
-    ResourceScheduler scheduler = mockResourceScheduler();
+
+    if (shouldUseCs.useCapacityScheduler()) {
+      scheduler = mockResourceScheduler(CapacityScheduler.class);
+    } else {
+      scheduler = mockResourceScheduler();
+    }
+
     ((RMContextImpl)rmContext).setScheduler(scheduler);
 
     Configuration conf = new Configuration();
@@ -880,7 +897,7 @@ public class TestAppManager extends AppManagerTestBase{
         new int[]{ 1, 1, 1, 1 }};
     for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
       for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
-        ResourceScheduler scheduler = mockResourceScheduler();
+        scheduler = mockResourceScheduler();
         Configuration conf = new Configuration();
         conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
             globalMaxAppAttempts[i]);
@@ -1061,7 +1078,12 @@ public class TestAppManager extends AppManagerTestBase{
   }
 
   private static ResourceScheduler mockResourceScheduler() {
-    ResourceScheduler scheduler = mock(ResourceScheduler.class);
+    return mockResourceScheduler(ResourceScheduler.class);
+  }
+
+  private static <T extends ResourceScheduler> ResourceScheduler
+      mockResourceScheduler(Class<T> schedulerClass) {
+    ResourceScheduler scheduler = mock(schedulerClass);
     when(scheduler.getMinimumResourceCapability()).thenReturn(
         Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
@@ -1299,6 +1321,51 @@ public class TestAppManager extends AppManagerTestBase{
     Assert.assertEquals(expectedUser, userNameForPlacement);
   }
 
+  @Test
+  @UseMockCapacityScheduler
+  public void testCheckAccessFullPathWithCapacityScheduler()
+      throws YarnException {
+    // make sure we only combine "parent + queue" if CS is selected
+    testCheckAccess("root.users", "hadoop");
+  }
+
+  @Test
+  @UseMockCapacityScheduler
+  public void testCheckAccessLeafQueueOnlyWithCapacityScheduler()
+      throws YarnException {
+    // make sure we that NPE is avoided if there's no parent defined
+    testCheckAccess(null, "hadoop");
+  }
+
+  private void testCheckAccess(String parent, String queue)
+      throws YarnException {
+    enableApplicationTagPlacement(true, "hadoop");
+    String userIdTag = USER_ID_PREFIX + "hadoop";
+    setApplicationTags("tag1", userIdTag, "tag2");
+    PlacementManager placementMgr = mock(PlacementManager.class);
+    ApplicationPlacementContext appContext;
+    String expectedQueue;
+    if (parent == null) {
+      appContext = new ApplicationPlacementContext(queue);
+      expectedQueue = queue;
+    } else {
+      appContext = new ApplicationPlacementContext(queue, parent);
+      expectedQueue = parent + "." + queue;
+    }
+
+    when(placementMgr.placeApplication(asContext, "hadoop"))
+            .thenReturn(appContext);
+    appMonitor.getUserNameForPlacement("hadoop", asContext, placementMgr);
+
+    ArgumentCaptor<String> queueNameCaptor =
+        ArgumentCaptor.forClass(String.class);
+    verify(scheduler).checkAccess(any(UserGroupInformation.class),
+        any(QueueACL.class), queueNameCaptor.capture());
+
+    assertEquals("Expected access check for queue",
+        expectedQueue, queueNameCaptor.getValue());
+  }
+
   private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
                                              String... whiteListedUsers) {
     Configuration conf = new Configuration();
@@ -1307,7 +1374,6 @@ public class TestAppManager extends AppManagerTestBase{
     conf.setStrings(YarnConfiguration
             .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
     ((RMContextImpl) rmContext).setYarnConfiguration(conf);
-    ResourceScheduler scheduler = mockResourceScheduler();
     when(scheduler.checkAccess(any(UserGroupInformation.class),
             eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
             .thenReturn(userHasAccessToQueue);
@@ -1338,4 +1404,24 @@ public class TestAppManager extends AppManagerTestBase{
     Collections.addAll(applicationTags, tags);
     asContext.setApplicationTags(applicationTags);
   }
+
+  private class UseCapacitySchedulerRule extends TestWatcher {
+    private boolean useCapacityScheduler;
+
+    @Override
+    protected void starting(Description d) {
+      useCapacityScheduler =
+          d.getAnnotation(UseMockCapacityScheduler.class) != null;
+    }
+
+    public boolean useCapacityScheduler() {
+      return useCapacityScheduler;
+    }
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  public @interface UseMockCapacityScheduler {
+    // mark test cases with this which require
+    // the scheduler type to be CapacityScheduler
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index caf1df4..596cca1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement
     .ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -84,7 +89,6 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
 import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
 
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -839,4 +843,62 @@ public class TestCapacitySchedulerAutoQueueCreation
       }
     }
   }
+
+  @Test
+  public void testDynamicAutoQueueCreationWithTags()
+      throws Exception {
+    MockRM rm = null;
+    try {
+      CapacitySchedulerConfiguration csConf
+          = new CapacitySchedulerConfiguration();
+      csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+          new String[] {"a", "b"});
+      csConf.setCapacity("root.a", 90);
+      csConf.setCapacity("root.b", 10);
+      csConf.setAutoCreateChildQueueEnabled("root.a", true);
+      csConf.setAutoCreatedLeafQueueConfigCapacity("root.a", 50);
+      csConf.setAutoCreatedLeafQueueConfigMaxCapacity("root.a", 100);
+      csConf.setAcl("root.a", QueueACL.ADMINISTER_QUEUE, "*");
+      csConf.setAcl("root.a", QueueACL.SUBMIT_APPLICATIONS, "*");
+      csConf.setBoolean(YarnConfiguration
+          .APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
+      csConf.setStrings(YarnConfiguration
+          .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop");
+      csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+          "u:%user:root.a.%user");
+
+      RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+      mgr.init(csConf);
+      rm = new MockRM(csConf) {
+        @Override
+        public RMNodeLabelsManager createNodeLabelManager() {
+          return mgr;
+        }
+      };
+      rm.start();
+      MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
+
+      MockRMAppSubmissionData data =
+          MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+          .withAppName("apptodynamicqueue")
+          .withUser("hadoop")
+          .withAcls(null)
+          .withUnmanagedAM(false)
+          .withApplicationTags(Sets.newHashSet("userid=testuser"))
+          .build();
+      RMApp app = MockRMAppSubmitter.submit(rm, data);
+      MockRM.launchAndRegisterAM(app, rm, nm);
+      nm.nodeHeartbeat(true);
+
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      CSQueue queue = cs.getQueue("root.a.testuser");
+      assertNotNull("Leaf queue has not been auto-created", queue);
+      assertEquals("Number of running applications", 1,
+          queue.getNumApplications());
+    } finally {
+      if (rm != null) {
+        rm.close();
+      }
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
index 9eeb9b4..bb5b790 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
@@ -17,15 +17,22 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
+import org.junit.Test;
 
 public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
   @Override
@@ -132,6 +139,7 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
         .reinitialize(csConf, resourceManager.getRMContext());
   }
 
+
   private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
                float capacity, String queuePath) {
     csConf.setCapacity(queuePath, capacity);
@@ -142,4 +150,38 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
     csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
     csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
   }
+
+  @Test
+  public void testCheckAccessForUserWithOnlyLeafNameProvided() {
+    testCheckAccess(false, "dynamicQueue");
+  }
+
+  @Test
+  public void testCheckAccessForUserWithFullPathProvided() {
+    testCheckAccess(true, "root.users.dynamicQueue");
+  }
+
+  @Test
+  public void testCheckAccessForRootQueue() {
+    testCheckAccess(false, "root");
+  }
+
+  private void testCheckAccess(boolean expectedResult, String queueName) {
+    CapacitySchedulerQueueManager qm =
+        mock(CapacitySchedulerQueueManager.class);
+    CSQueue root = mock(ParentQueue.class);
+    CSQueue users = mock(ManagedParentQueue.class);
+    when(qm.getQueue("root")).thenReturn(root);
+    when(qm.getQueue("root.users")).thenReturn(users);
+    when(users.hasAccess(any(QueueACL.class),
+        any(UserGroupInformation.class))).thenReturn(true);
+    UserGroupInformation mockUGI = mock(UserGroupInformation.class);
+
+    CapacityScheduler cs =
+        (CapacityScheduler) resourceManager.getResourceScheduler();
+    cs.setQueueManager(qm);
+
+    assertEquals("checkAccess() failed", expectedResult,
+        cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org