You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2020/10/30 17:01:02 UTC
[hadoop] branch trunk updated: YARN-10458. Hive On Tez queries
fails upon submission to dynamically created pools. (Peter Bacsko via
wangda)
This is an automated email from the ASF dual-hosted git repository.
wangda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c47c9fd YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)
c47c9fd is described below
commit c47c9fd65d835fc53c9eef59a05b97335b04e320
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Fri Oct 30 09:49:48 2020 -0700
YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)
Change-Id: I518dc925187ce55e9d35a37ba20878c0f4e37e5c
---
.../yarn/server/resourcemanager/RMAppManager.java | 4 +
.../scheduler/capacity/CapacityScheduler.java | 20 +++++
.../server/resourcemanager/TestAppManager.java | 94 +++++++++++++++++++++-
.../TestCapacitySchedulerAutoQueueCreation.java | 64 ++++++++++++++-
.../capacity/TestCapacitySchedulerQueueACLs.java | 42 ++++++++++
5 files changed, 219 insertions(+), 5 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 1e62f25..fe18d82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -927,6 +927,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
return usernameUsedForPlacement;
}
String queue = appPlacementContext.getQueue();
+ String parent = appPlacementContext.getParentQueue();
+ if (scheduler instanceof CapacityScheduler && parent != null) {
+ queue = parent + "." + queue;
+ }
if (callerUGI != null && scheduler
.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
usernameUsedForPlacement = userNameFromAppTag;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a632bfa..259cd5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2290,6 +2290,21 @@ public class CapacityScheduler extends
public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
CSQueue queue = getQueue(queueName);
+
+ if (queueName.startsWith("root.")) {
+ // can only check proper ACLs if the path is fully qualified
+ while (queue == null) {
+ int sepIndex = queueName.lastIndexOf(".");
+ String parentName = queueName.substring(0, sepIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue {} does not exist, checking parent {}",
+ queueName, parentName);
+ }
+ queueName = parentName;
+ queue = queueManager.getQueue(queueName);
+ }
+ }
+
if (queue == null) {
LOG.debug("ACL not found for queue access-type {} for queue {}",
acl, queueName);
@@ -3307,4 +3322,9 @@ public class CapacityScheduler extends
public void setActivitiesManagerEnabled(boolean enabled) {
this.activitiesManagerEnabled = enabled;
}
+
+ @VisibleForTesting
+ public void setQueueManager(CapacitySchedulerQueueManager qm) {
+ this.queueManager = qm;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index bc89917..e8b4105 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -84,11 +84,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -102,6 +108,7 @@ import java.util.concurrent.ConcurrentMap;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -120,12 +127,16 @@ import static org.mockito.Mockito.when;
*/
public class TestAppManager extends AppManagerTestBase{
+ @Rule
+ public UseCapacitySchedulerRule shouldUseCs = new UseCapacitySchedulerRule();
+
private static final Logger LOG =
LoggerFactory.getLogger(TestAppManager.class);
private static RMAppEventType appEventType = RMAppEventType.KILL;
private static String USER = "user_";
private static String USER0 = USER + 0;
+ private ResourceScheduler scheduler;
private static final String USER_ID_PREFIX = "userid=";
@@ -227,7 +238,13 @@ public class TestAppManager extends AppManagerTestBase{
rmContext = mockRMContext(1, now - 10);
rmContext
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
- ResourceScheduler scheduler = mockResourceScheduler();
+
+ if (shouldUseCs.useCapacityScheduler()) {
+ scheduler = mockResourceScheduler(CapacityScheduler.class);
+ } else {
+ scheduler = mockResourceScheduler();
+ }
+
((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration();
@@ -880,7 +897,7 @@ public class TestAppManager extends AppManagerTestBase{
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
- ResourceScheduler scheduler = mockResourceScheduler();
+ scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
globalMaxAppAttempts[i]);
@@ -1061,7 +1078,12 @@ public class TestAppManager extends AppManagerTestBase{
}
private static ResourceScheduler mockResourceScheduler() {
- ResourceScheduler scheduler = mock(ResourceScheduler.class);
+ return mockResourceScheduler(ResourceScheduler.class);
+ }
+
+ private static <T extends ResourceScheduler> ResourceScheduler
+ mockResourceScheduler(Class<T> schedulerClass) {
+ ResourceScheduler scheduler = mock(schedulerClass);
when(scheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
@@ -1299,6 +1321,51 @@ public class TestAppManager extends AppManagerTestBase{
Assert.assertEquals(expectedUser, userNameForPlacement);
}
+ @Test
+ @UseMockCapacityScheduler
+ public void testCheckAccessFullPathWithCapacityScheduler()
+ throws YarnException {
+ // make sure we only combine "parent + queue" if CS is selected
+ testCheckAccess("root.users", "hadoop");
+ }
+
+ @Test
+ @UseMockCapacityScheduler
+ public void testCheckAccessLeafQueueOnlyWithCapacityScheduler()
+ throws YarnException {
+ // make sure we that NPE is avoided if there's no parent defined
+ testCheckAccess(null, "hadoop");
+ }
+
+ private void testCheckAccess(String parent, String queue)
+ throws YarnException {
+ enableApplicationTagPlacement(true, "hadoop");
+ String userIdTag = USER_ID_PREFIX + "hadoop";
+ setApplicationTags("tag1", userIdTag, "tag2");
+ PlacementManager placementMgr = mock(PlacementManager.class);
+ ApplicationPlacementContext appContext;
+ String expectedQueue;
+ if (parent == null) {
+ appContext = new ApplicationPlacementContext(queue);
+ expectedQueue = queue;
+ } else {
+ appContext = new ApplicationPlacementContext(queue, parent);
+ expectedQueue = parent + "." + queue;
+ }
+
+ when(placementMgr.placeApplication(asContext, "hadoop"))
+ .thenReturn(appContext);
+ appMonitor.getUserNameForPlacement("hadoop", asContext, placementMgr);
+
+ ArgumentCaptor<String> queueNameCaptor =
+ ArgumentCaptor.forClass(String.class);
+ verify(scheduler).checkAccess(any(UserGroupInformation.class),
+ any(QueueACL.class), queueNameCaptor.capture());
+
+ assertEquals("Expected access check for queue",
+ expectedQueue, queueNameCaptor.getValue());
+ }
+
private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
String... whiteListedUsers) {
Configuration conf = new Configuration();
@@ -1307,7 +1374,6 @@ public class TestAppManager extends AppManagerTestBase{
conf.setStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
((RMContextImpl) rmContext).setYarnConfiguration(conf);
- ResourceScheduler scheduler = mockResourceScheduler();
when(scheduler.checkAccess(any(UserGroupInformation.class),
eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
.thenReturn(userHasAccessToQueue);
@@ -1338,4 +1404,24 @@ public class TestAppManager extends AppManagerTestBase{
Collections.addAll(applicationTags, tags);
asContext.setApplicationTags(applicationTags);
}
+
+ private class UseCapacitySchedulerRule extends TestWatcher {
+ private boolean useCapacityScheduler;
+
+ @Override
+ protected void starting(Description d) {
+ useCapacityScheduler =
+ d.getAnnotation(UseMockCapacityScheduler.class) != null;
+ }
+
+ public boolean useCapacityScheduler() {
+ return useCapacityScheduler;
+ }
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface UseMockCapacityScheduler {
+ // mark test cases with this which require
+ // the scheduler type to be CapacityScheduler
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index caf1df4..596cca1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -84,7 +89,6 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -839,4 +843,62 @@ public class TestCapacitySchedulerAutoQueueCreation
}
}
}
+
+ @Test
+ public void testDynamicAutoQueueCreationWithTags()
+ throws Exception {
+ MockRM rm = null;
+ try {
+ CapacitySchedulerConfiguration csConf
+ = new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+ csConf.setCapacity("root.a", 90);
+ csConf.setCapacity("root.b", 10);
+ csConf.setAutoCreateChildQueueEnabled("root.a", true);
+ csConf.setAutoCreatedLeafQueueConfigCapacity("root.a", 50);
+ csConf.setAutoCreatedLeafQueueConfigMaxCapacity("root.a", 100);
+ csConf.setAcl("root.a", QueueACL.ADMINISTER_QUEUE, "*");
+ csConf.setAcl("root.a", QueueACL.SUBMIT_APPLICATIONS, "*");
+ csConf.setBoolean(YarnConfiguration
+ .APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
+ csConf.setStrings(YarnConfiguration
+ .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop");
+ csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+ "u:%user:root.a.%user");
+
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(csConf);
+ rm = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm.start();
+ MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
+
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("apptodynamicqueue")
+ .withUser("hadoop")
+ .withAcls(null)
+ .withUnmanagedAM(false)
+ .withApplicationTags(Sets.newHashSet("userid=testuser"))
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ MockRM.launchAndRegisterAM(app, rm, nm);
+ nm.nodeHeartbeat(true);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue("root.a.testuser");
+ assertNotNull("Leaf queue has not been auto-created", queue);
+ assertEquals("Number of running applications", 1,
+ queue.getNumApplications());
+ } finally {
+ if (rm != null) {
+ rm.close();
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
index 9eeb9b4..bb5b790 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java
@@ -17,15 +17,22 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
+import org.junit.Test;
public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
@Override
@@ -132,6 +139,7 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
.reinitialize(csConf, resourceManager.getRMContext());
}
+
private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
float capacity, String queuePath) {
csConf.setCapacity(queuePath, capacity);
@@ -142,4 +150,38 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
}
+
+ @Test
+ public void testCheckAccessForUserWithOnlyLeafNameProvided() {
+ testCheckAccess(false, "dynamicQueue");
+ }
+
+ @Test
+ public void testCheckAccessForUserWithFullPathProvided() {
+ testCheckAccess(true, "root.users.dynamicQueue");
+ }
+
+ @Test
+ public void testCheckAccessForRootQueue() {
+ testCheckAccess(false, "root");
+ }
+
+ private void testCheckAccess(boolean expectedResult, String queueName) {
+ CapacitySchedulerQueueManager qm =
+ mock(CapacitySchedulerQueueManager.class);
+ CSQueue root = mock(ParentQueue.class);
+ CSQueue users = mock(ManagedParentQueue.class);
+ when(qm.getQueue("root")).thenReturn(root);
+ when(qm.getQueue("root.users")).thenReturn(users);
+ when(users.hasAccess(any(QueueACL.class),
+ any(UserGroupInformation.class))).thenReturn(true);
+ UserGroupInformation mockUGI = mock(UserGroupInformation.class);
+
+ CapacityScheduler cs =
+ (CapacityScheduler) resourceManager.getResourceScheduler();
+ cs.setQueueManager(qm);
+
+ assertEquals("checkAccess() failed", expectedResult,
+ cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org