You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:02 UTC
svn commit: r1542125 - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-y...
Author: arp
Date: Thu Nov 14 23:56:56 2013
New Revision: 1542125
URL: http://svn.apache.org/r1542125
Log:
Merging r1541618 through r1542122 from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Thu Nov 14 23:56:56 2013
@@ -40,6 +40,9 @@ Release 2.3.0 - UNRELEASED
YARN-311. RM/scheduler support for dynamic resource configuration.
(Junping Du via llu)
+ YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair
+ Scheduler (Sandy Ryza)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
@@ -94,6 +97,9 @@ Release 2.3.0 - UNRELEASED
YARN-1387. RMWebServices should use ClientRMService for filtering
applications (Karthik Kambatla via Sandy Ryza)
+ YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
+ Kambatla via bikas)
+
OPTIMIZATIONS
BUG FIXES
@@ -127,6 +133,9 @@ Release 2.3.0 - UNRELEASED
YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be
YARN_RESOURCEMANAGER_OPTS. (Raja Aluri via cnauroth)
+ YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent
+ (Gera Shegalov via Sandy Ryza)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Thu Nov 14 23:56:56 2013
@@ -178,6 +178,12 @@
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
+ <Field name="numRetries" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
<Field name="renewalTimer" />
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java Thu Nov 14 23:56:56 2013
@@ -193,8 +193,8 @@ public class HAUtil {
return addSuffix(prefix, getRMHAId(conf));
}
- private static String getConfValueForRMInstance(String prefix,
- Configuration conf) {
+ public static String getConfValueForRMInstance(String prefix,
+ Configuration conf) {
String confKey = getConfKeyForRMInstance(prefix, conf);
String retVal = conf.getTrimmed(confKey);
if (LOG.isTraceEnabled()) {
@@ -205,8 +205,8 @@ public class HAUtil {
return retVal;
}
- static String getConfValueForRMInstance(String prefix, String defaultValue,
- Configuration conf) {
+ public static String getConfValueForRMInstance(
+ String prefix, String defaultValue, Configuration conf) {
String value = getConfValueForRMInstance(prefix, conf);
return (value == null) ? defaultValue : value;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Nov 14 23:56:56 2013
@@ -328,6 +328,8 @@ public class YarnConfiguration extends C
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
+ public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
+ ZK_STATE_STORE_PREFIX + "root-node.acl";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Thu Nov 14 23:56:56 2013
@@ -279,7 +279,11 @@
<description>Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
- as the value for yarn.resourcemanager.store.class</description>
+ as the value for yarn.resourcemanager.store.class. ZKRMStateStore
+ is implicitly fenced, meaning a single ResourceManager is
+ able to use the store at any point in time. More details on this, along
+ with setting up appropriate ACLs is discussed under the description for
+ yarn.resourcemanager.zk.state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk.state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
@@ -321,6 +325,31 @@
</property>
<property>
+ <description>
+ ACLs to be used for the root znode when using ZKRMStateStore in a HA
+ scenario for fencing.
+
+ ZKRMStateStore supports implicit fencing to allow a single
+ ResourceManager write-access to the store. For fencing, the
+ ResourceManagers in the cluster share read-write-admin privileges on the
+ root node, but the Active ResourceManager claims exclusive create-delete
+ permissions.
+
+ By default, when this property is not set, we use the ACLs from
+ yarn.resourcemanager.zk.state-store.acl for shared admin access and
+ rm-address:cluster-timestamp for username-based exclusive create-delete
+ access.
+
+ This property allows users to set ACLs of their choice instead of using
+ the default mechanism. For fencing to work, the ACLs should be
+ carefully set differently on each ResourceManger such that all the
+ ResourceManagers have shared admin access and the Active ResourceManger
+ takes over (exclusively) the create-delete access.
+ </description>
+ <name>yarn.resourcemanager.zk.state-store.root-node.acl</name>
+ </property>
+
+ <property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Thu Nov 14 23:56:56 2013
@@ -375,13 +375,19 @@ public class ContainerLaunch implements
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
+
+ final Signal signal = sleepDelayBeforeSigKill > 0
+ ? Signal.TERM
+ : Signal.KILL;
+
+ boolean result = exec.signalContainer(user, processId, signal);
+
+ LOG.debug("Sent signal " + signal + " to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr
+ + ", result=" + (result? "success" : "failed"));
+
if (sleepDelayBeforeSigKill > 0) {
- boolean result = exec.signalContainer(user,
- processId, Signal.TERM);
- LOG.debug("Sent signal to pid " + processId
- + " as user " + user
- + " for container " + containerIdStr
- + ", result=" + (result? "success" : "failed"));
new DelayedProcessKiller(container, user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Thu Nov 14 23:56:56 2013
@@ -97,7 +97,6 @@ public class TestContainerLaunch extends
conf.setClass(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
- conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 1000);
super.setup();
}
@@ -590,8 +589,9 @@ public class TestContainerLaunch extends
AuxiliaryServiceHelper.getServiceDataFromEnv(serviceName, env));
}
- @Test
- public void testDelayedKill() throws Exception {
+ private void internalKillTest(boolean delayed) throws Exception {
+ conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+ delayed ? 1000 : 0);
containerManager.start();
// ////// Construct the Container-id
@@ -675,7 +675,8 @@ public class TestContainerLaunch extends
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- // container stop sends a sigterm followed by a sigkill
+ // if delayed container stop sends a sigterm followed by a sigkill
+ // otherwise sigkill is sent immediately
GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds);
@@ -690,7 +691,7 @@ public class TestContainerLaunch extends
// Windows, because the process is not notified when killed by winutils.
// There is no way for the process to trap and respond. Instead, we can
// verify that the job object with ID matching container ID no longer exists.
- if (Shell.WINDOWS) {
+ if (Shell.WINDOWS || !delayed) {
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(cId.toString()));
} else {
@@ -713,6 +714,16 @@ public class TestContainerLaunch extends
}
}
+ @Test
+ public void testDelayedKill() throws Exception {
+ internalKillTest(true);
+ }
+
+ @Test
+ public void testImmediateKill() throws Exception {
+ internalKillTest(false);
+ }
+
@SuppressWarnings("rawtypes")
@Test
public void testCallFailureWithNullLocalizedResources() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java Thu Nov 14 23:56:56 2013
@@ -67,7 +67,9 @@ public class RMHAProtocolService extends
protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
- private boolean haEnabled;
+
+ @InterfaceAudience.Private
+ boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
super("RMHAProtocolService");
@@ -174,7 +176,8 @@ public class RMHAProtocolService extends
}
}
- private synchronized void transitionToActive() throws Exception {
+ @InterfaceAudience.Private
+ synchronized void transitionToActive() throws Exception {
if (haState == HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
@@ -205,7 +208,8 @@ public class RMHAProtocolService extends
}
}
- private synchronized void transitionToStandby(boolean initialize)
+ @InterfaceAudience.Private
+ synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceState.STANDBY) {
LOG.info("Already in standby state");
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Nov 14 23:56:56 2013
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -163,6 +165,10 @@ public class ResourceManager extends Com
public ResourceManager() {
super("ResourceManager");
}
+
+ public RMHAProtocolService getHAService() {
+ return this.haService;
+ }
public RMContext getRMContext() {
return this.rmContext;
@@ -216,6 +222,11 @@ public class ResourceManager extends Com
return new SchedulerEventDispatcher(this.scheduler);
}
+ protected RMStateStoreOperationFailedEventDispatcher
+ createRMStateStoreOperationFailedEventDispatcher() {
+ return new RMStateStoreOperationFailedEventDispatcher(haService);
+ }
+
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
@@ -339,6 +350,8 @@ public class ResourceManager extends Com
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
+ rmDispatcher.register(RMStateStoreOperationFailedEventType.class,
+ createRMStateStoreOperationFailedEventDispatcher());
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
@@ -633,6 +646,46 @@ public class ResourceManager extends Com
}
@Private
+ public static class RMStateStoreOperationFailedEventDispatcher implements
+ EventHandler<RMStateStoreOperationFailedEvent> {
+ private final RMHAProtocolService haService;
+
+ public RMStateStoreOperationFailedEventDispatcher(
+ RMHAProtocolService haService) {
+ this.haService = haService;
+ }
+
+ @Override
+ public void handle(RMStateStoreOperationFailedEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received a " +
+ RMStateStoreOperationFailedEvent.class.getName() + " of type " +
+ event.getType().name());
+ }
+ if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
+ LOG.info("RMStateStore has been fenced");
+ synchronized(haService) {
+ if (haService.haEnabled) {
+ try {
+ // Transition to standby and reinit active services
+ LOG.info("Transitioning RM to Standby mode");
+ haService.transitionToStandby(true);
+ return;
+ } catch (Exception e) {
+ LOG.error("Failed to transition RM to Standby mode.");
+ }
+ }
+ }
+ }
+
+ LOG.error("Shutting down RM on receiving a " +
+ RMStateStoreOperationFailedEvent.class.getName() + " of type " +
+ event.getType().name());
+ ExitUtil.terminate(1, event.getCause());
+ }
+ }
+
+ @Private
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu Nov 14 23:56:56 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -388,9 +389,13 @@ public abstract class RMStateStore exten
*/
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
- int latestSequenceNumber) throws Exception {
- storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
- latestSequenceNumber);
+ int latestSequenceNumber) {
+ try {
+ storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
+ latestSequenceNumber);
+ } catch (Exception e) {
+ notifyStoreOperationFailed(e);
+ }
}
/**
@@ -406,9 +411,12 @@ public abstract class RMStateStore exten
* RMDTSecretManager call this to remove the state of a delegation token
*/
public synchronized void removeRMDelegationToken(
- RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
- throws Exception {
- removeRMDelegationTokenState(rmDTIdentifier);
+ RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+ try {
+ removeRMDelegationTokenState(rmDTIdentifier);
+ } catch (Exception e) {
+ notifyStoreOperationFailed(e);
+ }
}
/**
@@ -421,9 +429,12 @@ public abstract class RMStateStore exten
/**
* RMDTSecretManager call this to store the state of a master key
*/
- public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
- throws Exception {
- storeRMDTMasterKeyState(delegationKey);
+ public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
+ try {
+ storeRMDTMasterKeyState(delegationKey);
+ } catch (Exception e) {
+ notifyStoreOperationFailed(e);
+ }
}
/**
@@ -437,9 +448,12 @@ public abstract class RMStateStore exten
/**
* RMDTSecretManager call this to remove the state of a master key
*/
- public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
- throws Exception {
- removeRMDTMasterKeyState(delegationKey);
+ public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
+ try {
+ removeRMDTMasterKeyState(delegationKey);
+ } catch (Exception e) {
+ notifyStoreOperationFailed(e);
+ }
}
/**
@@ -539,19 +553,15 @@ public abstract class RMStateStore exten
try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
storeApplicationStateInternal(appId.toString(), appStateData);
+ notifyDoneStoringApplication(appId, storedException);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId.toString(), appStateData);
+ notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
- storedException = e;
- } finally {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- notifyDoneStoringApplication(appId, storedException);
- } else {
- notifyDoneUpdatingApplication(appId, storedException);
- }
+ notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
@@ -589,24 +599,20 @@ public abstract class RMStateStore exten
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
storeApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
+ notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+ storedException);
} else {
assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
updateApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
- }
- } catch (Exception e) {
- LOG
- .error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
- storedException = e;
- } finally {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- } else {
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
- storedException);
+ storedException);
}
+ } catch (Exception e) {
+ LOG.error(
+ "Error storing appAttempt: " + attemptState.getAttemptId(), e);
+ notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
ApplicationState appState =
@@ -616,11 +622,10 @@ public abstract class RMStateStore exten
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
+ notifyDoneRemovingApplcation(appId, removedException);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
- removedException = e;
- } finally {
- notifyDoneRemovingApplcation(appId, removedException);
+ notifyStoreOperationFailed(e);
}
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
@@ -629,6 +634,24 @@ public abstract class RMStateStore exten
@SuppressWarnings("unchecked")
/**
+ * In {#handleStoreEvent}, this method is called to notify the
+ * ResourceManager that the store operation has failed.
+ * @param failureCause the exception due to which the operation failed
+ */
+ private void notifyStoreOperationFailed(Exception failureCause) {
+ RMStateStoreOperationFailedEventType type;
+ if (failureCause instanceof StoreFencedException) {
+ type = RMStateStoreOperationFailedEventType.FENCED;
+ } else {
+ type = RMStateStoreOperationFailedEventType.FAILED;
+ }
+
+ rmDispatcher.getEventHandler().handle(
+ new RMStateStoreOperationFailedEvent(type, failureCause));
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
* In (@link handleStoreEvent}, this method is called to notify the
* application that new application is stored in state store
* @param appId id of the application that has been saved
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Thu Nov 14 23:56:56 2013
@@ -23,7 +23,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -31,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -38,11 +41,14 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.RMHAServiceTarget;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -53,11 +59,14 @@ import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
@Private
@Unstable
@@ -83,6 +92,55 @@ public class ZKRMStateStore extends RMSt
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
+ /** Fencing related variables */
+ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
+ private String fencingNodePath;
+ private Op createFencingNodePathOp;
+ private Op deleteFencingNodePathOp;
+
+ @VisibleForTesting
+ List<ACL> zkRootNodeAcl;
+ private boolean useDefaultFencingScheme = false;
+ public static final int CREATE_DELETE_PERMS =
+ ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
+ private final String zkRootNodeAuthScheme =
+ new DigestAuthenticationProvider().getScheme();
+
+ private String zkRootNodeUsername;
+ private String zkRootNodePassword;
+
+ /**
+ * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
+ * ZooKeeper access, construct the {@link ACL}s for the store's root node.
+ * In the constructed {@link ACL}, all the users allowed by zkAcl are given
+ * rwa access, while the current RM has exclude create-delete access.
+ *
+ * To be called only when HA is enabled and the configuration doesn't set ACL
+ * for the root node.
+ */
+ @VisibleForTesting
+ @Private
+ @Unstable
+ protected List<ACL> constructZkRootNodeACL(
+ Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
+ List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
+ for (ACL acl : sourceACLs) {
+ zkRootNodeAcl.add(new ACL(
+ ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
+ acl.getId()));
+ }
+
+ zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+ zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp());
+ Id rmId = new Id(zkRootNodeAuthScheme,
+ DigestAuthenticationProvider.generateDigest(
+ zkRootNodeUsername + ":" + zkRootNodePassword));
+ zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
+ return zkRootNodeAcl;
+ }
+
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
@@ -116,6 +174,29 @@ public class ZKRMStateStore extends RMSt
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+
+ /* Initialize fencing related paths, acls, and ops */
+ fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+ createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
+ CreateMode.PERSISTENT);
+ deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
+ if (HAUtil.isHAEnabled(conf)) {
+ String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
+ (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
+ if (zkRootNodeAclConf != null) {
+ zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
+ try {
+ zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
+ } catch (ZKUtil.BadAclFormatException bafe) {
+ LOG.error("Invalid format for " +
+ YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
+ throw bafe;
+ }
+ } else {
+ useDefaultFencingScheme = true;
+ zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
+ }
+ }
}
@Override
@@ -126,20 +207,76 @@ public class ZKRMStateStore extends RMSt
// ensure root dirs exist
createRootDir(znodeWorkingPath);
createRootDir(zkRootNodePath);
+ if (HAUtil.isHAEnabled(getConfig())){
+ fence();
+ }
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
}
- private void createRootDir(String rootPath) throws Exception {
+ private void createRootDir(final String rootPath) throws Exception {
+ // For root dirs, we shouldn't use the doMulti helper methods
try {
- createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ new ZKAction<String>() {
+ @Override
+ public String run() throws KeeperException, InterruptedException {
+ return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ }
+ }.runWithRetries();
} catch (KeeperException ke) {
- if (ke.code() != Code.NODEEXISTS) {
+ if (ke.code() == Code.NODEEXISTS) {
+ LOG.debug(rootPath + "znode already exists!");
+ } else {
throw ke;
}
}
}
+ private void logRootNodeAcls(String prefix) throws KeeperException,
+ InterruptedException {
+ Stat getStat = new Stat();
+ List<ACL> getAcls = zkClient.getACL(zkRootNodePath, getStat);
+
+ StringBuilder builder = new StringBuilder();
+ builder.append(prefix);
+ for (ACL acl : getAcls) {
+ builder.append(acl.toString());
+ }
+ builder.append(getStat.toString());
+ LOG.debug(builder.toString());
+ }
+
+ private synchronized void fence() throws Exception {
+ if (LOG.isTraceEnabled()) {
+ logRootNodeAcls("Before fencing\n");
+ }
+
+ new ZKAction<Void>() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
+ return null;
+ }
+ }.runWithRetries();
+
+ // delete fencingnodepath
+ new ZKAction<Void>() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ try {
+ zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
+ }
+ return null;
+ }
+ }.runWithRetries();
+
+ if (LOG.isTraceEnabled()) {
+ logRootNodeAcls("After fencing\n");
+ }
+ }
+
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
try {
@@ -176,7 +313,8 @@ public class ZKRMStateStore extends RMSt
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
- List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
+ List<String> childNodes =
+ getChildrenWithRetries(rmDTSecretManagerRoot, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@@ -209,7 +347,7 @@ public class ZKRMStateStore extends RMSt
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
+ List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
@@ -466,6 +604,8 @@ public class ZKRMStateStore extends RMSt
}
@VisibleForTesting
+ @Private
+ @Unstable
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
Event.EventType eventType = event.getType();
@@ -506,65 +646,71 @@ public class ZKRMStateStore extends RMSt
}
@VisibleForTesting
+ @Private
+ @Unstable
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
- @VisibleForTesting
- public String createWithRetries(
- final String path, final byte[] data, final List<ACL> acl,
- final CreateMode mode) throws Exception {
- return new ZKAction<String>() {
- @Override
- public String run() throws KeeperException, InterruptedException {
- return zkClient.create(path, data, acl, mode);
- }
- }.runWithRetries();
- }
-
- private void deleteWithRetries(final String path, final int version)
- throws Exception {
+ /**
+ * Helper method that creates fencing node, executes the passed operations,
+ * and deletes the fencing node.
+ */
+ private synchronized void doMultiWithRetries(
+ final List<Op> opList) throws Exception {
+ final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
+ execOpList.add(createFencingNodePathOp);
+ execOpList.addAll(opList);
+ execOpList.add(deleteFencingNodePathOp);
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
- /**
- * Call exists() to leave a watch on the node denoted by path.
- * Delete node if exists. To pass the existence information to the
- * caller, call delete irrespective of whether node exists or not.
- */
- if (zkClient.exists(path, true) == null) {
- LOG.error("Trying to delete a path (" + path
- + ") that doesn't exist.");
- }
- zkClient.delete(path, version);
+ zkClient.multi(execOpList);
return null;
}
}.runWithRetries();
}
- private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.multi(opList);
- return null;
+ /**
+ * Helper method that creates fencing node, executes the passed operation,
+ * and deletes the fencing node.
+ */
+ private void doMultiWithRetries(final Op op) throws Exception {
+ doMultiWithRetries(Collections.singletonList(op));
+ }
+
+ @VisibleForTesting
+ @Private
+ @Unstable
+ public void createWithRetries(
+ final String path, final byte[] data, final List<ACL> acl,
+ final CreateMode mode) throws Exception {
+ doMultiWithRetries(Op.create(path, data, acl, mode));
+ }
+
+ private void deleteWithRetries(final String path, final int version)
+ throws Exception {
+ try {
+ doMultiWithRetries(Op.delete(path, version));
+ } catch (KeeperException.NoNodeException nne) {
+ // We tried to delete a node that doesn't exist
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempted to delete a non-existing znode " + path);
}
- }.runWithRetries();
+ }
}
@VisibleForTesting
+ @Private
+ @Unstable
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.setData(path, data, version);
- return null;
- }
- }.runWithRetries();
+ doMultiWithRetries(Op.setData(path, data, version));
}
@VisibleForTesting
+ @Private
+ @Unstable
public byte[] getDataWithRetries(final String path, final boolean watch)
throws Exception {
return new ZKAction<byte[]>() {
@@ -576,6 +722,16 @@ public class ZKRMStateStore extends RMSt
}.runWithRetries();
}
+ private List<String> getChildrenWithRetries(
+ final String path, final boolean watch) throws Exception {
+ return new ZKAction<List<String>>() {
+ @Override
+ List<String> run() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(path, watch);
+ }
+ }.runWithRetries();
+ }
+
private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
@@ -596,11 +752,29 @@ public class ZKRMStateStore extends RMSt
}
}
+ private boolean shouldRetry(Code code) {
+ switch (code) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return runWithCheck();
+ } catch (KeeperException.NoAuthException nae) {
+ if (HAUtil.isHAEnabled(getConfig())) {
+ // NoAuthException possibly means that this store is fenced due to
+ // another RM becoming active. Even if not,
+ // it is safer to assume we have been fenced
+ throw new StoreFencedException();
+ }
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
continue;
@@ -611,17 +785,6 @@ public class ZKRMStateStore extends RMSt
}
}
- private static boolean shouldRetry(Code code) {
- switch (code) {
- case CONNECTIONLOSS:
- case OPERATIONTIMEOUT:
- return true;
- default:
- break;
- }
- return false;
- }
-
private synchronized void createConnection()
throws IOException, InterruptedException {
closeZkClients();
@@ -629,6 +792,10 @@ public class ZKRMStateStore extends RMSt
retries++) {
try {
zkClient = getNewZooKeeper();
+ if (useDefaultFencingScheme) {
+ zkClient.addAuthInfo(zkRootNodeAuthScheme,
+ (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
+ }
} catch (IOException ioe) {
// Retry in case of network failures
LOG.info("Failed to connect to the ZooKeeper on attempt - " +
@@ -646,6 +813,8 @@ public class ZKRMStateStore extends RMSt
// protected to mock for testing
@VisibleForTesting
+ @Private
+ @Unstable
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Nov 14 23:56:56 2013
@@ -136,9 +136,6 @@ public class FairScheduler implements Re
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
- // Whether to use username in place of "default" queue name
- private volatile boolean userAsDefaultQueue = false;
-
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -640,6 +637,12 @@ public class FairScheduler implements Re
RMApp rmApp = rmContext.getRMApps().get(
applicationAttemptId.getApplicationId());
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+ if (queue == null) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId,
+ "Application rejected by queue placement policy"));
+ return;
+ }
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
@@ -675,17 +678,16 @@ public class FairScheduler implements Re
@VisibleForTesting
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
- // Potentially set queue to username if configured to do so
- if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
- userAsDefaultQueue) {
- queueName = user;
- }
-
- FSLeafQueue queue = queueMgr.getLeafQueue(queueName,
- conf.getAllowUndeclaredPools());
- if (queue == null) {
- // queue is not an existing or createable leaf queue
- queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
+ FSLeafQueue queue = null;
+ try {
+ QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
+ queueName = policy.assignAppToQueue(queueName, user);
+ if (queueName == null) {
+ return null;
+ }
+ queue = queueMgr.getLeafQueue(queueName, true);
+ } catch (IOException ex) {
+ LOG.error("Error assigning app to queue, rejecting", ex);
}
if (rmApp != null) {
@@ -1155,7 +1157,6 @@ public class FairScheduler implements Re
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
- userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Nov 14 23:56:56 2013
@@ -25,6 +25,7 @@ import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -51,6 +52,8 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
@@ -87,6 +90,8 @@ public class QueueManager {
private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
+ @VisibleForTesting
+ volatile QueuePlacementPolicy placementPolicy;
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -107,6 +112,8 @@ public class QueueManager {
queues.put(rootQueue.getName(), rootQueue);
this.allocFile = conf.getAllocationFile();
+ placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+ new HashSet<String>(), conf);
reloadAllocs();
lastSuccessfulReload = scheduler.getClock().getTime();
@@ -115,6 +122,28 @@ public class QueueManager {
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}
+ public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
+
+ }
+
+ /**
+ * Construct simple queue placement policy from allow-undeclared-pools and
+ * user-as-default-queue.
+ */
+ private List<QueuePlacementRule> getSimplePlacementRules() {
+ boolean create = scheduler.getConf().getAllowUndeclaredPools();
+ boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+ if (userAsDefaultQueue) {
+ rules.add(new QueuePlacementRule.User().initialize(create, null));
+ }
+ if (!userAsDefaultQueue || !create) {
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ }
+ return rules;
+ }
+
/**
* Get a queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -226,6 +255,10 @@ public class QueueManager {
return queues.containsKey(name);
}
}
+
+ public QueuePlacementPolicy getPlacementPolicy() {
+ return placementPolicy;
+ }
/**
* Reload allocations file if it hasn't been loaded in a while
@@ -290,6 +323,8 @@ public class QueueManager {
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
+
+ QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -306,6 +341,7 @@ public class QueueManager {
"file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>();
+ Element placementPolicyElement = null;
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
@@ -348,6 +384,8 @@ public class QueueManager {
String text = ((Text)element.getFirstChild()).getData().trim();
SchedulingPolicy.setDefault(text);
defaultSchedPolicy = SchedulingPolicy.getDefault();
+ } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+ placementPolicyElement = element;
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -369,6 +407,15 @@ public class QueueManager {
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
}
+
+ // Load placement policy and pass it configured queues
+ if (placementPolicyElement != null) {
+ newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+ new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+ } else {
+ newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+ new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+ }
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
@@ -377,6 +424,7 @@ public class QueueManager {
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
+ placementPolicy = newPlacementPolicy;
// Make sure all queues exist
for (String name: queueNamesInAllocFile) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Thu Nov 14 23:56:56 2013
@@ -18,15 +18,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
@@ -56,7 +73,7 @@ public class TestZKRMStateStore extends
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
- YarnConfiguration conf = new YarnConfiguration();
+ Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@@ -77,4 +94,81 @@ public class TestZKRMStateStore extends
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
+
+ private Configuration createHARMConf(
+ String rmIds, String rmId, int adminPort) {
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+ conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
+ }
+ conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
+ return conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFencing() throws Exception {
+ StateChangeRequestInfo req = new StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234);
+ ResourceManager rm1 = new ResourceManager();
+ rm1.init(conf1);
+ rm1.start();
+ rm1.getHAService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm1.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm1.getHAService().getServiceStatus().getState());
+
+ Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
+ ResourceManager rm2 = new ResourceManager();
+ rm2.init(conf2);
+ rm2.start();
+ rm2.getHAService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm2.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getHAService().getServiceStatus().getState());
+
+ // Submitting an application to RM1 to trigger a state store operation.
+ // RM1 should realize that it got fenced and is not the Active RM anymore.
+ Map mockMap = mock(Map.class);
+ ApplicationSubmissionContext asc =
+ ApplicationSubmissionContext.newInstance(
+ ApplicationId.newInstance(1000, 1),
+ "testApplication", // app Name
+ "default", // queue name
+ Priority.newInstance(0),
+ ContainerLaunchContext.newInstance(mockMap, mockMap,
+ new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
+ mockMap),
+ false, // unmanaged AM
+ true, // cancelTokens
+ 1, // max app attempts
+ Resource.newInstance(1024, 1));
+ ClientRMService rmService = rm1.getClientRMService();
+ rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
+
+ for (int i = 0; i < 30; i++) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
+ .getServiceStatus().getState()) {
+ Thread.sleep(100);
+ }
+ }
+ assertEquals("RM should have been fenced",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm1.getHAService().getServiceStatus().getState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getHAService().getServiceStatus().getState());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Nov 14 23:56:56 2013
@@ -44,7 +44,9 @@ import javax.xml.parsers.ParserConfigura
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -94,6 +96,8 @@ import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
+import com.google.common.collect.Sets;
+
public class TestFairScheduler {
private class MockClock implements Clock {
@@ -616,6 +620,7 @@ public class TestFairScheduler {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, resourceManager.getRMContext());
+ scheduler.getQueueManager().initialize();
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
@@ -664,6 +669,46 @@ public class TestFairScheduler {
assertEquals(rmApp2.getQueue(), queue2.getName());
assertEquals("root.notdefault", rmApp2.getQueue());
}
+
+ @Test
+ public void testQueuePlacementWithPolicy() throws Exception {
+ Configuration conf = createConfiguration();
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ ApplicationAttemptId appId;
+ Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
+
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+ rules.add(new QueuePlacementRule.User().initialize(false, null));
+ rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ Set<String> queues = Sets.newHashSet("root.user1", "root.user3group");
+ scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+ rules, queues, conf);
+ appId = createSchedulingRequest(1024, "somequeue", "user1");
+ assertEquals("root.somequeue", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "user1");
+ assertEquals("root.user1", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "user3");
+ assertEquals("root.user3group", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "otheruser");
+ assertEquals("root.default", apps.get(appId).getQueueName());
+
+ // test without specified as first rule
+ rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.User().initialize(false, null));
+ rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+ rules, queues, conf);
+ appId = createSchedulingRequest(1024, "somequeue", "user1");
+ assertEquals("root.user1", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "somequeue", "otheruser");
+ assertEquals("root.somequeue", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "otheruser");
+ assertEquals("root.default", apps.get(appId).getQueueName());
+ }
@Test
public void testFairShareWithMinAlloc() throws Exception {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Thu Nov 14 23:56:56 2013
@@ -101,6 +101,16 @@ Hadoop MapReduce Next Generation - Fair
Fair Scheduler. Among them, is the use of a custom policies governing
priority âboostingâ over certain apps.
+* {Automatically placing applications in queues}
+
+ The Fair Scheduler allows administrators to configure policies that
+ automatically place submitted applications into appropriate queues. Placement
+ can depend on the user and groups of the submitter and the requested queue
+ passed by the application. A policy consists of a set of rules that are applied
+ sequentially to classify an incoming application. Each rule either places the
+ app into a queue, rejects it, or continues on to the next rule. Refer to the
+ allocation file format below for how to configure these policies.
+
* {Installation}
To use the Fair Scheduler first assign the appropriate scheduler class in
@@ -138,7 +148,8 @@ Properties that can be placed in yarn-si
* Whether to use the username associated with the allocation as the default
queue name, in the event that a queue name is not specified. If this is set
to "false" or unset, all jobs have a shared default queue, named "default".
- Defaults to true.
+ Defaults to true. If a queue placement policy is given in the allocations
+ file, this property is ignored.
* <<<yarn.scheduler.fair.preemption>>>
@@ -180,6 +191,16 @@ Properties that can be placed in yarn-si
opportunities to pass up. The default value of -1.0 means don't pass up any
scheduling opportunities.
+ * <<<yarn.scheduler.fair.allow-undeclared-pools>>>
+
+ * If this is true, new queues can be created at application submission time,
+ whether because they are specified as the application's queue by the
+ submitter or because they are placed there by the user-as-default-queue
+ property. If this is false, any time an app would be placed in a queue that
+ is not specified in the allocations file, it is placed in the "default" queue
+ instead. Defaults to true. If a queue placement policy is given in the
+ allocations file, this property is ignored.
+
Allocation file format
The allocation file must be in XML format. The format contains five types of
@@ -248,25 +269,29 @@ Allocation file format
policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair".
- An example allocation file is given here:
+ * <<A queuePlacementPolicy element>>, which contains a list of rule elements
+ that tell the scheduler how to place incoming apps into queues. Rules
+ are applied in the order that they are listed. Rules may take arguments. All
+ rules accept the "create" argument, which indicates whether the rule can create
+ a new queue. "Create" defaults to true; if set to false and the rule would
+ place the app in a queue that is not configured in the allocations file, we
+ continue on to the next rule. The last rule must be one that can never issue a
+ continue. Valid rules are:
+
+ * specified: the app is placed into the queue it requested. If the app
+ requested no queue, i.e. it specified "default", we continue.
+
+ * user: the app is placed into a queue with the name of the user who
+ submitted it.
-Queue Access Control Lists (ACLs)
+ * primaryGroup: the app is placed into a queue with the name of the
+ primary group of the user who submitted it.
- Queue Access Control Lists (ACLs) allow administrators to control who may
- take actions on particular queues. They are configured with the aclSubmitApps
- and aclAdministerApps properties, which can be set per queue. Currently the
- only supported administrative action is killing an application. Anybody who
- may administer a queue may also submit applications to it. These properties
- take values in a format like "user1,user2 group1,group2" or " group1,group2".
- An action on a queue will be permitted if its user or group is in the ACL of
- that queue or in the ACL of any of that queue's ancestors. So if queue2
- is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
- ACL, then both users may submit to queue2.
-
- The root queue's ACLs are "*" by default which, because ACLs are passed down,
- means that everybody may submit to and kill applications from every queue.
- To start restricting access, change the root queue's ACLs to something other
- than "*".
+ * default: the app is placed into the queue named "default".
+
+ * reject: the app is rejected.
+
+ An example allocation file is given here:
---
<?xml version="1.0"?>
@@ -282,14 +307,41 @@ Queue Access Control Lists (ACLs)
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
+
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
+
+ <queuePlacementPolicy>
+ <specified />
+ <primarygroup create="false" />
+ <default />
+ </queuePlacementPolicy>
</allocations>
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
+
+
+Queue Access Control Lists (ACLs)
+
+ Queue Access Control Lists (ACLs) allow administrators to control who may
+ take actions on particular queues. They are configured with the aclSubmitApps
+ and aclAdministerApps properties, which can be set per queue. Currently the
+ only supported administrative action is killing an application. Anybody who
+ may administer a queue may also submit applications to it. These properties
+ take values in a format like "user1,user2 group1,group2" or " group1,group2".
+ An action on a queue will be permitted if its user or group is in the ACL of
+ that queue or in the ACL of any of that queue's ancestors. So if queue2
+ is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
+ ACL, then both users may submit to queue2.
+
+ The root queue's ACLs are "*" by default which, because ACLs are passed down,
+ means that everybody may submit to and kill applications from every queue.
+ To start restricting access, change the root queue's ACLs to something other
+ than "*".
+
* {Administration}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml Thu Nov 14 23:56:56 2013
@@ -119,6 +119,7 @@
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>