You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:02 UTC

svn commit: r1542125 - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-y...

Author: arp
Date: Thu Nov 14 23:56:56 2013
New Revision: 1542125

URL: http://svn.apache.org/r1542125
Log:
Merging r1541618 through r1542122 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Thu Nov 14 23:56:56 2013
@@ -40,6 +40,9 @@ Release 2.3.0 - UNRELEASED
     YARN-311. RM/scheduler support for dynamic resource configuration.
     (Junping Du via llu)
 
+    YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair
+    Scheduler (Sandy Ryza)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
@@ -94,6 +97,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1387. RMWebServices should use ClientRMService for filtering
     applications (Karthik Kambatla via Sandy Ryza)
 
+    YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
+    Kambatla via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -127,6 +133,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be
     YARN_RESOURCEMANAGER_OPTS. (Raja Aluri via cnauroth)
 
+    YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent
+    (Gera Shegalov via Sandy Ryza)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Thu Nov 14 23:56:56 2013
@@ -178,6 +178,12 @@
     <Field name="minimumAllocation" />
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
+  <!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
+    <Field name="numRetries" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
   <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
     <Field name="renewalTimer" />

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java Thu Nov 14 23:56:56 2013
@@ -193,8 +193,8 @@ public class HAUtil {
     return addSuffix(prefix, getRMHAId(conf));
   }
 
-  private static String getConfValueForRMInstance(String prefix,
-                                                  Configuration conf) {
+  public static String getConfValueForRMInstance(String prefix,
+                                                 Configuration conf) {
     String confKey = getConfKeyForRMInstance(prefix, conf);
     String retVal = conf.getTrimmed(confKey);
     if (LOG.isTraceEnabled()) {
@@ -205,8 +205,8 @@ public class HAUtil {
     return retVal;
   }
 
-  static String getConfValueForRMInstance(String prefix, String defaultValue,
-                                          Configuration conf) {
+  public static String getConfValueForRMInstance(
+      String prefix, String defaultValue, Configuration conf) {
     String value = getConfValueForRMInstance(prefix, conf);
     return (value == null) ? defaultValue : value;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Nov 14 23:56:56 2013
@@ -328,6 +328,8 @@ public class YarnConfiguration extends C
       ZK_STATE_STORE_PREFIX + "acl";
   public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
       "world:anyone:rwcda";
+  public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
+      ZK_STATE_STORE_PREFIX + "root-node.acl";
 
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Thu Nov 14 23:56:56 2013
@@ -279,7 +279,11 @@
     <description>Host:Port of the ZooKeeper server where RM state will 
     be stored. This must be supplied when using
     org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
-    as the value for yarn.resourcemanager.store.class</description>
+    as the value for yarn.resourcemanager.store.class. ZKRMStateStore
+    is implicitly fenced, meaning a single ResourceManager is
+    able to use the store at any point in time. More details on this, along
+    with setting up appropriate ACLs is discussed under the description for
+    yarn.resourcemanager.zk.state-store.root-node.acl.</description>
     <name>yarn.resourcemanager.zk.state-store.address</name>
     <!--value>127.0.0.1:2181</value-->
   </property>
@@ -321,6 +325,31 @@
   </property>
 
   <property>
+    <description>
+      ACLs to be used for the root znode when using ZKRMStateStore in a HA
+      scenario for fencing.
+
+      ZKRMStateStore supports implicit fencing to allow a single
+      ResourceManager write-access to the store. For fencing, the
+      ResourceManagers in the cluster share read-write-admin privileges on the
+      root node, but the Active ResourceManager claims exclusive create-delete
+      permissions.
+
+      By default, when this property is not set, we use the ACLs from
+      yarn.resourcemanager.zk.state-store.acl for shared admin access and
+      rm-address:cluster-timestamp for username-based exclusive create-delete
+      access.
+
+      This property allows users to set ACLs of their choice instead of using
+      the default mechanism. For fencing to work, the ACLs should be
+      carefully set differently on each ResourceManger such that all the
+      ResourceManagers have shared admin access and the Active ResourceManger
+      takes over (exclusively) the create-delete access.
+    </description>
+    <name>yarn.resourcemanager.zk.state-store.root-node.acl</name>
+  </property>
+
+  <property>
     <description>URI pointing to the location of the FileSystem path where
     RM state will be stored. This must be supplied when using
     org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Thu Nov 14 23:56:56 2013
@@ -375,13 +375,19 @@ public class ContainerLaunch implements 
         LOG.debug("Sending signal to pid " + processId
             + " as user " + user
             + " for container " + containerIdStr);
+
+        final Signal signal = sleepDelayBeforeSigKill > 0
+          ? Signal.TERM
+          : Signal.KILL;
+
+        boolean result = exec.signalContainer(user, processId, signal);
+
+        LOG.debug("Sent signal " + signal + " to pid " + processId
+          + " as user " + user
+          + " for container " + containerIdStr
+          + ", result=" + (result? "success" : "failed"));
+
         if (sleepDelayBeforeSigKill > 0) {
-          boolean result = exec.signalContainer(user,
-              processId, Signal.TERM);
-          LOG.debug("Sent signal to pid " + processId
-              + " as user " + user
-              + " for container " + containerIdStr
-              + ", result=" + (result? "success" : "failed"));
           new DelayedProcessKiller(container, user,
               processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
         }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Thu Nov 14 23:56:56 2013
@@ -97,7 +97,6 @@ public class TestContainerLaunch extends
     conf.setClass(
         YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
         LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
-    conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 1000);
     super.setup();
   }
 
@@ -590,8 +589,9 @@ public class TestContainerLaunch extends
         AuxiliaryServiceHelper.getServiceDataFromEnv(serviceName, env));
   }
 
-  @Test
-  public void testDelayedKill() throws Exception {
+  private void internalKillTest(boolean delayed) throws Exception {
+    conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+      delayed ? 1000 : 0);
     containerManager.start();
 
     // ////// Construct the Container-id
@@ -675,7 +675,8 @@ public class TestContainerLaunch extends
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
 
-    // container stop sends a sigterm followed by a sigkill
+    // if delayed container stop sends a sigterm followed by a sigkill
+    // otherwise sigkill is sent immediately 
     GetContainerStatusesRequest gcsRequest =
         GetContainerStatusesRequest.newInstance(containerIds);
     
@@ -690,7 +691,7 @@ public class TestContainerLaunch extends
     // Windows, because the process is not notified when killed by winutils.
     // There is no way for the process to trap and respond.  Instead, we can
     // verify that the job object with ID matching container ID no longer exists.
-    if (Shell.WINDOWS) {
+    if (Shell.WINDOWS || !delayed) {
       Assert.assertFalse("Process is still alive!",
         DefaultContainerExecutor.containerIsAlive(cId.toString()));
     } else {
@@ -713,6 +714,16 @@ public class TestContainerLaunch extends
     }
   }
 
+  @Test
+  public void testDelayedKill() throws Exception {
+    internalKillTest(true);
+  }
+
+  @Test
+  public void testImmediateKill() throws Exception {
+    internalKillTest(false);
+  }
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testCallFailureWithNullLocalizedResources() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java Thu Nov 14 23:56:56 2013
@@ -67,7 +67,9 @@ public class RMHAProtocolService extends
   protected HAServiceState haState = HAServiceState.INITIALIZING;
   private AccessControlList adminAcl;
   private Server haAdminServer;
-  private boolean haEnabled;
+
+  @InterfaceAudience.Private
+  boolean haEnabled;
 
   public RMHAProtocolService(ResourceManager resourceManager)  {
     super("RMHAProtocolService");
@@ -174,7 +176,8 @@ public class RMHAProtocolService extends
     }
   }
 
-  private synchronized void transitionToActive() throws Exception {
+  @InterfaceAudience.Private
+  synchronized void transitionToActive() throws Exception {
     if (haState == HAServiceState.ACTIVE) {
       LOG.info("Already in active state");
       return;
@@ -205,7 +208,8 @@ public class RMHAProtocolService extends
     }
   }
 
-  private synchronized void transitionToStandby(boolean initialize)
+  @InterfaceAudience.Private
+  synchronized void transitionToStandby(boolean initialize)
       throws Exception {
     if (haState == HAServiceState.STANDBY) {
       LOG.info("Already in standby state");

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Nov 14 23:56:56 2013
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -163,6 +165,10 @@ public class ResourceManager extends Com
   public ResourceManager() {
     super("ResourceManager");
   }
+
+  public RMHAProtocolService getHAService() {
+    return this.haService;
+  }
   
   public RMContext getRMContext() {
     return this.rmContext;
@@ -216,6 +222,11 @@ public class ResourceManager extends Com
     return new SchedulerEventDispatcher(this.scheduler);
   }
 
+  protected RMStateStoreOperationFailedEventDispatcher
+  createRMStateStoreOperationFailedEventDispatcher() {
+    return new RMStateStoreOperationFailedEventDispatcher(haService);
+  }
+
   protected Dispatcher createDispatcher() {
     return new AsyncDispatcher();
   }
@@ -339,6 +350,8 @@ public class ResourceManager extends Com
       try {
         rmStore.init(conf);
         rmStore.setRMDispatcher(rmDispatcher);
+        rmDispatcher.register(RMStateStoreOperationFailedEventType.class,
+            createRMStateStoreOperationFailedEventDispatcher());
       } catch (Exception e) {
         // the Exception from stateStore.init() needs to be handled for
         // HA and we need to give up master status if we got fenced
@@ -633,6 +646,46 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public static class RMStateStoreOperationFailedEventDispatcher implements
+      EventHandler<RMStateStoreOperationFailedEvent> {
+    private final RMHAProtocolService haService;
+
+    public RMStateStoreOperationFailedEventDispatcher(
+        RMHAProtocolService haService) {
+      this.haService = haService;
+    }
+
+    @Override
+    public void handle(RMStateStoreOperationFailedEvent event) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received a " +
+            RMStateStoreOperationFailedEvent.class.getName() + " of type " +
+            event.getType().name());
+      }
+      if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
+        LOG.info("RMStateStore has been fenced");
+        synchronized(haService) {
+          if (haService.haEnabled) {
+            try {
+              // Transition to standby and reinit active services
+              LOG.info("Transitioning RM to Standby mode");
+              haService.transitionToStandby(true);
+              return;
+            } catch (Exception e) {
+              LOG.error("Failed to transition RM to Standby mode.");
+            }
+          }
+        }
+      }
+
+      LOG.error("Shutting down RM on receiving a " +
+          RMStateStoreOperationFailedEvent.class.getName() + " of type " +
+          event.getType().name());
+      ExitUtil.terminate(1, event.getCause());
+    }
+  }
+
+  @Private
   public static final class ApplicationEventDispatcher implements
       EventHandler<RMAppEvent> {
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu Nov 14 23:56:56 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -388,9 +389,13 @@ public abstract class RMStateStore exten
    */
   public synchronized void storeRMDelegationTokenAndSequenceNumber(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
-    storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
-      latestSequenceNumber);
+      int latestSequenceNumber) {
+    try {
+      storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
+          latestSequenceNumber);
+    } catch (Exception e) {
+      notifyStoreOperationFailed(e);
+    }
   }
 
   /**
@@ -406,9 +411,12 @@ public abstract class RMStateStore exten
    * RMDTSecretManager call this to remove the state of a delegation token
    */
   public synchronized void removeRMDelegationToken(
-      RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
-      throws Exception {
-    removeRMDelegationTokenState(rmDTIdentifier);
+      RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+    try {
+      removeRMDelegationTokenState(rmDTIdentifier);
+    } catch (Exception e) {
+      notifyStoreOperationFailed(e);
+    }
   }
 
   /**
@@ -421,9 +429,12 @@ public abstract class RMStateStore exten
   /**
    * RMDTSecretManager call this to store the state of a master key
    */
-  public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
-      throws Exception {
-    storeRMDTMasterKeyState(delegationKey);
+  public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
+    try {
+      storeRMDTMasterKeyState(delegationKey);
+    } catch (Exception e) {
+      notifyStoreOperationFailed(e);
+    }
   }
 
   /**
@@ -437,9 +448,12 @@ public abstract class RMStateStore exten
   /**
    * RMDTSecretManager call this to remove the state of a master key
    */
-  public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
-      throws Exception {
-    removeRMDTMasterKeyState(delegationKey);
+  public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
+    try {
+      removeRMDTMasterKeyState(delegationKey);
+    } catch (Exception e) {
+      notifyStoreOperationFailed(e);
+    }
   }
 
   /**
@@ -539,19 +553,15 @@ public abstract class RMStateStore exten
       try {
         if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
           storeApplicationStateInternal(appId.toString(), appStateData);
+          notifyDoneStoringApplication(appId, storedException);
         } else {
           assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
           updateApplicationStateInternal(appId.toString(), appStateData);
+          notifyDoneUpdatingApplication(appId, storedException);
         }
       } catch (Exception e) {
         LOG.error("Error storing app: " + appId, e);
-        storedException = e;
-      } finally {
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          notifyDoneStoringApplication(appId, storedException);
-        } else {
-          notifyDoneUpdatingApplication(appId, storedException);
-        }
+        notifyStoreOperationFailed(e);
       }
     } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
         || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
@@ -589,24 +599,20 @@ public abstract class RMStateStore exten
         if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
           storeApplicationAttemptStateInternal(attemptState.getAttemptId()
             .toString(), attemptStateData);
+          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+              storedException);
         } else {
           assert event.getType().equals(
             RMStateStoreEventType.UPDATE_APP_ATTEMPT);
           updateApplicationAttemptStateInternal(attemptState.getAttemptId()
             .toString(), attemptStateData);
-        }
-      } catch (Exception e) {
-        LOG
-          .error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
-        storedException = e;
-      } finally {
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-            storedException);
-        } else {
           notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-            storedException);
+              storedException);
         }
+      } catch (Exception e) {
+        LOG.error(
+            "Error storing appAttempt: " + attemptState.getAttemptId(), e);
+        notifyStoreOperationFailed(e);
       }
     } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
       ApplicationState appState =
@@ -616,11 +622,10 @@ public abstract class RMStateStore exten
       LOG.info("Removing info for app: " + appId);
       try {
         removeApplicationState(appState);
+        notifyDoneRemovingApplcation(appId, removedException);
       } catch (Exception e) {
         LOG.error("Error removing app: " + appId, e);
-        removedException = e;
-      } finally {
-        notifyDoneRemovingApplcation(appId, removedException);
+        notifyStoreOperationFailed(e);
       }
     } else {
       LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
@@ -629,6 +634,24 @@ public abstract class RMStateStore exten
 
   @SuppressWarnings("unchecked")
   /**
+   * In {#handleStoreEvent}, this method is called to notify the
+   * ResourceManager that the store operation has failed.
+   * @param failureCause the exception due to which the operation failed
+   */
+  private void notifyStoreOperationFailed(Exception failureCause) {
+    RMStateStoreOperationFailedEventType type;
+    if (failureCause instanceof StoreFencedException) {
+      type = RMStateStoreOperationFailedEventType.FENCED;
+    } else {
+      type = RMStateStoreOperationFailedEventType.FAILED;
+    }
+
+    rmDispatcher.getEventHandler().handle(
+        new RMStateStoreOperationFailedEvent(type, failureCause));
+  }
+
+  @SuppressWarnings("unchecked")
+  /**
    * In (@link handleStoreEvent}, this method is called to notify the
    * application that new application is stored in state store
    * @param appId id of the application that has been saved

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Thu Nov 14 23:56:56 2013
@@ -23,7 +23,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -31,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -38,11 +41,14 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.RMHAServiceTarget;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -53,11 +59,14 @@ import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 
 @Private
 @Unstable
@@ -83,6 +92,55 @@ public class ZKRMStateStore extends RMSt
   protected ZooKeeper zkClient;
   private ZooKeeper oldZkClient;
 
+  /** Fencing related variables */
+  private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
+  private String fencingNodePath;
+  private Op createFencingNodePathOp;
+  private Op deleteFencingNodePathOp;
+
+  @VisibleForTesting
+  List<ACL> zkRootNodeAcl;
+  private boolean useDefaultFencingScheme = false;
+  public static final int CREATE_DELETE_PERMS =
+      ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
+  private final String zkRootNodeAuthScheme =
+      new DigestAuthenticationProvider().getScheme();
+
+  private String zkRootNodeUsername;
+  private String zkRootNodePassword;
+
+  /**
+   * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
+   * ZooKeeper access, construct the {@link ACL}s for the store's root node.
+   * In the constructed {@link ACL}, all the users allowed by zkAcl are given
+   * rwa access, while the current RM has exclude create-delete access.
+   *
+   * To be called only when HA is enabled and the configuration doesn't set ACL
+   * for the root node.
+   */
+  @VisibleForTesting
+  @Private
+  @Unstable
+  protected List<ACL> constructZkRootNodeACL(
+      Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
+    List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
+    for (ACL acl : sourceACLs) {
+      zkRootNodeAcl.add(new ACL(
+          ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
+          acl.getId()));
+    }
+
+    zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+        YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+    zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp());
+    Id rmId = new Id(zkRootNodeAuthScheme,
+        DigestAuthenticationProvider.generateDigest(
+            zkRootNodeUsername + ":" + zkRootNodePassword));
+    zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
+    return zkRootNodeAcl;
+  }
+
   @Override
   public synchronized void initInternal(Configuration conf) throws Exception {
     zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
@@ -116,6 +174,29 @@ public class ZKRMStateStore extends RMSt
     zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
     rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
     rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+
+    /* Initialize fencing related paths, acls, and ops */
+    fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+    createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
+        CreateMode.PERSISTENT);
+    deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
+    if (HAUtil.isHAEnabled(conf)) {
+      String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
+          (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
+      if (zkRootNodeAclConf != null) {
+        zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
+        try {
+          zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
+        } catch (ZKUtil.BadAclFormatException bafe) {
+          LOG.error("Invalid format for " +
+              YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
+          throw bafe;
+        }
+      } else {
+        useDefaultFencingScheme = true;
+        zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
+      }
+    }
   }
 
   @Override
@@ -126,20 +207,76 @@ public class ZKRMStateStore extends RMSt
     // ensure root dirs exist
     createRootDir(znodeWorkingPath);
     createRootDir(zkRootNodePath);
+    if (HAUtil.isHAEnabled(getConfig())){
+      fence();
+    }
     createRootDir(rmDTSecretManagerRoot);
     createRootDir(rmAppRoot);
   }
 
-  private void createRootDir(String rootPath) throws Exception {
+  private void createRootDir(final String rootPath) throws Exception {
+    // For root dirs, we shouldn't use the doMulti helper methods
     try {
-      createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+      new ZKAction<String>() {
+        @Override
+        public String run() throws KeeperException, InterruptedException {
+          return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+        }
+      }.runWithRetries();
     } catch (KeeperException ke) {
-      if (ke.code() != Code.NODEEXISTS) {
+      if (ke.code() == Code.NODEEXISTS) {
+        LOG.debug(rootPath + "znode already exists!");
+      } else {
         throw ke;
       }
     }
   }
 
+  private void logRootNodeAcls(String prefix) throws KeeperException,
+      InterruptedException {
+    Stat getStat = new Stat();
+    List<ACL> getAcls = zkClient.getACL(zkRootNodePath, getStat);
+
+    StringBuilder builder = new StringBuilder();
+    builder.append(prefix);
+    for (ACL acl : getAcls) {
+      builder.append(acl.toString());
+    }
+    builder.append(getStat.toString());
+    LOG.debug(builder.toString());
+  }
+
+  private synchronized void fence() throws Exception {
+    if (LOG.isTraceEnabled()) {
+      logRootNodeAcls("Before fencing\n");
+    }
+
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
+        return null;
+      }
+    }.runWithRetries();
+
+    // delete fencingnodepath
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        try {
+          zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
+        } catch (KeeperException.NoNodeException nne) {
+          LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
+        }
+        return null;
+      }
+    }.runWithRetries();
+
+    if (LOG.isTraceEnabled()) {
+      logRootNodeAcls("After fencing\n");
+    }
+  }
+
   private synchronized void closeZkClients() throws IOException {
     if (zkClient != null) {
       try {
@@ -176,7 +313,8 @@ public class ZKRMStateStore extends RMSt
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
-    List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
+    List<String> childNodes =
+        getChildrenWithRetries(rmDTSecretManagerRoot, true);
 
     for (String childNodeName : childNodes) {
       if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@@ -209,7 +347,7 @@ public class ZKRMStateStore extends RMSt
   }
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
-    List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
+    List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
     List<ApplicationAttemptState> attempts =
         new ArrayList<ApplicationAttemptState>();
     for (String childNodeName : childNodes) {
@@ -466,6 +604,8 @@ public class ZKRMStateStore extends RMSt
   }
 
   @VisibleForTesting
+  @Private
+  @Unstable
   public synchronized void processWatchEvent(WatchedEvent event)
       throws Exception {
     Event.EventType eventType = event.getType();
@@ -506,65 +646,71 @@ public class ZKRMStateStore extends RMSt
   }
 
   @VisibleForTesting
+  @Private
+  @Unstable
   String getNodePath(String root, String nodeName) {
     return (root + "/" + nodeName);
   }
 
-  @VisibleForTesting
-  public String createWithRetries(
-      final String path, final byte[] data, final List<ACL> acl,
-      final CreateMode mode) throws Exception {
-    return new ZKAction<String>() {
-      @Override
-      public String run() throws KeeperException, InterruptedException {
-        return zkClient.create(path, data, acl, mode);
-      }
-    }.runWithRetries();
-  }
-
-  private void deleteWithRetries(final String path, final int version)
-      throws Exception {
+  /**
+   * Helper method that creates fencing node, executes the passed operations,
+   * and deletes the fencing node.
+   */
+  private synchronized void doMultiWithRetries(
+      final List<Op> opList) throws Exception {
+    final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
+    execOpList.add(createFencingNodePathOp);
+    execOpList.addAll(opList);
+    execOpList.add(deleteFencingNodePathOp);
     new ZKAction<Void>() {
       @Override
       public Void run() throws KeeperException, InterruptedException {
-        /**
-         * Call exists() to leave a watch on the node denoted by path.
-         * Delete node if exists. To pass the existence information to the
-         * caller, call delete irrespective of whether node exists or not.
-         */
-        if (zkClient.exists(path, true) == null) {
-          LOG.error("Trying to delete a path (" + path
-              + ") that doesn't exist.");
-        }
-        zkClient.delete(path, version);
+        zkClient.multi(execOpList);
         return null;
       }
     }.runWithRetries();
   }
 
-  private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        zkClient.multi(opList);
-        return null;
+  /**
+   * Helper method that creates fencing node, executes the passed operation,
+   * and deletes the fencing node.
+   */
+  private void doMultiWithRetries(final Op op) throws Exception {
+    doMultiWithRetries(Collections.singletonList(op));
+  }
+
+  @VisibleForTesting
+  @Private
+  @Unstable
+  public void createWithRetries(
+      final String path, final byte[] data, final List<ACL> acl,
+      final CreateMode mode) throws Exception {
+    doMultiWithRetries(Op.create(path, data, acl, mode));
+  }
+
+  private void deleteWithRetries(final String path, final int version)
+      throws Exception {
+    try {
+      doMultiWithRetries(Op.delete(path, version));
+    } catch (KeeperException.NoNodeException nne) {
+      // We tried to delete a node that doesn't exist
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Attempted to delete a non-existing znode " + path);
       }
-    }.runWithRetries();
+    }
   }
 
   @VisibleForTesting
+  @Private
+  @Unstable
   public void setDataWithRetries(final String path, final byte[] data,
                                  final int version) throws Exception {
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        zkClient.setData(path, data, version);
-        return null;
-      }
-    }.runWithRetries();
+    doMultiWithRetries(Op.setData(path, data, version));
   }
 
   @VisibleForTesting
+  @Private
+  @Unstable
   public byte[] getDataWithRetries(final String path, final boolean watch)
       throws Exception {
     return new ZKAction<byte[]>() {
@@ -576,6 +722,16 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  private List<String> getChildrenWithRetries(
+      final String path, final boolean watch) throws Exception {
+    return new ZKAction<List<String>>() {
+      @Override
+      List<String> run() throws KeeperException, InterruptedException {
+        return zkClient.getChildren(path, watch);
+      }
+    }.runWithRetries();
+  }
+
   private abstract class ZKAction<T> {
     // run() expects synchronization on ZKRMStateStore.this
     abstract T run() throws KeeperException, InterruptedException;
@@ -596,11 +752,29 @@ public class ZKRMStateStore extends RMSt
       }
     }
 
+    private boolean shouldRetry(Code code) {
+      switch (code) {
+        case CONNECTIONLOSS:
+        case OPERATIONTIMEOUT:
+          return true;
+        default:
+          break;
+      }
+      return false;
+    }
+
     T runWithRetries() throws Exception {
       int retry = 0;
       while (true) {
         try {
           return runWithCheck();
+        } catch (KeeperException.NoAuthException nae) {
+          if (HAUtil.isHAEnabled(getConfig())) {
+            // NoAuthException possibly means that this store is fenced due to
+            // another RM becoming active. Even if not,
+            // it is safer to assume we have been fenced
+            throw new StoreFencedException();
+          }
         } catch (KeeperException ke) {
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
             continue;
@@ -611,17 +785,6 @@ public class ZKRMStateStore extends RMSt
     }
   }
 
-  private static boolean shouldRetry(Code code) {
-    switch (code) {
-      case CONNECTIONLOSS:
-      case OPERATIONTIMEOUT:
-        return true;
-      default:
-        break;
-    }
-    return false;
-  }
-
   private synchronized void createConnection()
       throws IOException, InterruptedException {
     closeZkClients();
@@ -629,6 +792,10 @@ public class ZKRMStateStore extends RMSt
         retries++) {
       try {
         zkClient = getNewZooKeeper();
+        if (useDefaultFencingScheme) {
+          zkClient.addAuthInfo(zkRootNodeAuthScheme,
+              (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
+        }
       } catch (IOException ioe) {
         // Retry in case of network failures
         LOG.info("Failed to connect to the ZooKeeper on attempt - " +
@@ -646,6 +813,8 @@ public class ZKRMStateStore extends RMSt
 
   // protected to mock for testing
   @VisibleForTesting
+  @Private
+  @Unstable
   protected synchronized ZooKeeper getNewZooKeeper()
       throws IOException, InterruptedException {
     ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Nov 14 23:56:56 2013
@@ -136,9 +136,6 @@ public class FairScheduler implements Re
   // How often fair shares are re-calculated (ms)
   protected long UPDATE_INTERVAL = 500;
 
-  // Whether to use username in place of "default" queue name
-  private volatile boolean userAsDefaultQueue = false;
-
   private final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
 
@@ -640,6 +637,12 @@ public class FairScheduler implements Re
     RMApp rmApp = rmContext.getRMApps().get(
         applicationAttemptId.getApplicationId());
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+    if (queue == null) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId,
+              "Application rejected by queue placement policy"));
+      return;
+    }
 
     FSSchedulerApp schedulerApp =
         new FSSchedulerApp(applicationAttemptId, user,
@@ -675,17 +678,16 @@ public class FairScheduler implements Re
   
   @VisibleForTesting
   FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
-    // Potentially set queue to username if configured to do so
-    if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
-        userAsDefaultQueue) {
-      queueName = user;
-    }
-    
-    FSLeafQueue queue = queueMgr.getLeafQueue(queueName,
-        conf.getAllowUndeclaredPools());
-    if (queue == null) {
-      // queue is not an existing or createable leaf queue
-      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
+    FSLeafQueue queue = null;
+    try {
+      QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
+      queueName = policy.assignAppToQueue(queueName, user);
+      if (queueName == null) {
+        return null;
+      }
+      queue = queueMgr.getLeafQueue(queueName, true);
+    } catch (IOException ex) {
+      LOG.error("Error assigning app to queue, rejecting", ex);
     }
     
     if (rmApp != null) {
@@ -1155,7 +1157,6 @@ public class FairScheduler implements Re
     minimumAllocation = this.conf.getMinimumAllocation();
     maximumAllocation = this.conf.getMaximumAllocation();
     incrAllocation = this.conf.getIncrementAllocation();
-    userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
     continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
     continuousSchedulingSleepMs =
             this.conf.getContinuousSchedulingSleepMs();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Nov 14 23:56:56 2013
@@ -25,6 +25,7 @@ import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -51,6 +52,8 @@ import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
@@ -87,6 +90,8 @@ public class QueueManager {
   private FSParentQueue rootQueue;
 
   private volatile QueueManagerInfo info = new QueueManagerInfo();
+  @VisibleForTesting
+  volatile QueuePlacementPolicy placementPolicy;
   
   private long lastReloadAttempt; // Last time we tried to reload the queues file
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -107,6 +112,8 @@ public class QueueManager {
     queues.put(rootQueue.getName(), rootQueue);
     
     this.allocFile = conf.getAllocationFile();
+    placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+        new HashSet<String>(), conf);
     
     reloadAllocs();
     lastSuccessfulReload = scheduler.getClock().getTime();
@@ -115,6 +122,28 @@ public class QueueManager {
     getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
   }
   
+  public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
+    
+  }
+  
+  /**
+   * Construct simple queue placement policy from allow-undeclared-pools and
+   * user-as-default-queue.
+   */
+  private List<QueuePlacementRule> getSimplePlacementRules() {
+    boolean create = scheduler.getConf().getAllowUndeclaredPools();
+    boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    if (userAsDefaultQueue) {
+      rules.add(new QueuePlacementRule.User().initialize(create, null));
+    }
+    if (!userAsDefaultQueue || !create) {
+      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    }
+    return rules;
+  }
+  
   /**
    * Get a queue by name, creating it if the create param is true and is necessary.
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -226,6 +255,10 @@ public class QueueManager {
       return queues.containsKey(name);
     }
   }
+  
+  public QueuePlacementPolicy getPlacementPolicy() {
+    return placementPolicy;
+  }
 
   /**
    * Reload allocations file if it hasn't been loaded in a while
@@ -290,6 +323,8 @@ public class QueueManager {
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
+    
+    QueuePlacementPolicy newPlacementPolicy = null;
 
     // Remember all queue names so we can display them on web UI, etc.
     List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -306,6 +341,7 @@ public class QueueManager {
           "file: top-level element not <allocations>");
     NodeList elements = root.getChildNodes();
     List<Element> queueElements = new ArrayList<Element>();
+    Element placementPolicyElement = null;
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
       if (node instanceof Element) {
@@ -348,6 +384,8 @@ public class QueueManager {
           String text = ((Text)element.getFirstChild()).getData().trim();
           SchedulingPolicy.setDefault(text);
           defaultSchedPolicy = SchedulingPolicy.getDefault();
+        } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+          placementPolicyElement = element;
         } else {
           LOG.warn("Bad element in allocations file: " + element.getTagName());
         }
@@ -369,6 +407,15 @@ public class QueueManager {
           userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
           queueAcls, queueNamesInAllocFile);
     }
+    
+    // Load placement policy and pass it configured queues
+    if (placementPolicyElement != null) {
+      newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+    } else {
+      newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+    }
 
     // Commit the reload; also create any queue defined in the alloc file
     // if it does not already exist, so it can be displayed on the web UI.
@@ -377,6 +424,7 @@ public class QueueManager {
           queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
           queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
+      placementPolicy = newPlacementPolicy;
       
       // Make sure all queues exist
       for (String name: queueNamesInAllocFile) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Thu Nov 14 23:56:56 2013
@@ -18,15 +18,32 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 
@@ -56,7 +73,7 @@ public class TestZKRMStateStore extends 
 
     public RMStateStore getRMStateStore() throws Exception {
       String workingZnode = "/Test";
-      YarnConfiguration conf = new YarnConfiguration();
+      Configuration conf = new YarnConfiguration();
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
       this.client = createClient();
@@ -77,4 +94,81 @@ public class TestZKRMStateStore extends 
     testRMAppStateStore(zkTester);
     testRMDTSecretManagerStateStore(zkTester);
   }
+
+  private Configuration createHARMConf(
+      String rmIds, String rmId, int adminPort) {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
+    conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+    conf.set(YarnConfiguration.RM_HA_ID, rmId);
+    for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+      conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
+    }
+    conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
+    return conf;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFencing() throws Exception {
+    StateChangeRequestInfo req = new StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234);
+    ResourceManager rm1 = new ResourceManager();
+    rm1.init(conf1);
+    rm1.start();
+    rm1.getHAService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm1.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm1.getHAService().getServiceStatus().getState());
+
+    Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
+    ResourceManager rm2 = new ResourceManager();
+    rm2.init(conf2);
+    rm2.start();
+    rm2.getHAService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm2.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getHAService().getServiceStatus().getState());
+
+    // Submitting an application to RM1 to trigger a state store operation.
+    // RM1 should realize that it got fenced and is not the Active RM anymore.
+    Map mockMap = mock(Map.class);
+    ApplicationSubmissionContext asc =
+        ApplicationSubmissionContext.newInstance(
+            ApplicationId.newInstance(1000, 1),
+            "testApplication", // app Name
+            "default", // queue name
+            Priority.newInstance(0),
+            ContainerLaunchContext.newInstance(mockMap, mockMap,
+                new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
+                mockMap),
+            false, // unmanaged AM
+            true, // cancelTokens
+            1, // max app attempts
+            Resource.newInstance(1024, 1));
+    ClientRMService rmService = rm1.getClientRMService();
+    rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
+
+    for (int i = 0; i < 30; i++) {
+      if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
+          .getServiceStatus().getState()) {
+        Thread.sleep(100);
+      }
+    }
+    assertEquals("RM should have been fenced",
+        HAServiceProtocol.HAServiceState.STANDBY,
+        rm1.getHAService().getServiceStatus().getState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getHAService().getServiceStatus().getState());
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Nov 14 23:56:56 2013
@@ -44,7 +44,9 @@ import javax.xml.parsers.ParserConfigura
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -94,6 +96,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.xml.sax.SAXException;
 
+import com.google.common.collect.Sets;
+
 public class TestFairScheduler {
 
   private class MockClock implements Clock {
@@ -616,6 +620,7 @@ public class TestFairScheduler {
 
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
+    scheduler.getQueueManager().initialize();
     AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
         createAppAttemptId(2, 1), "default", "user2");
     scheduler.handle(appAddedEvent2);
@@ -664,6 +669,46 @@ public class TestFairScheduler {
     assertEquals(rmApp2.getQueue(), queue2.getName());
     assertEquals("root.notdefault", rmApp2.getQueue());
   }
+  
+  @Test
+  public void testQueuePlacementWithPolicy() throws Exception {
+    Configuration conf = createConfiguration();
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    ApplicationAttemptId appId;
+    Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
+
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+    rules.add(new QueuePlacementRule.User().initialize(false, null));
+    rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
+    rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    Set<String> queues = Sets.newHashSet("root.user1", "root.user3group");
+    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+        rules, queues, conf);
+    appId = createSchedulingRequest(1024, "somequeue", "user1");
+    assertEquals("root.somequeue", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "user1");
+    assertEquals("root.user1", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "user3");
+    assertEquals("root.user3group", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "otheruser");
+    assertEquals("root.default", apps.get(appId).getQueueName());
+    
+    // test without specified as first rule
+    rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.User().initialize(false, null));
+    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+    rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+        rules, queues, conf);
+    appId = createSchedulingRequest(1024, "somequeue", "user1");
+    assertEquals("root.user1", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "somequeue", "otheruser");
+    assertEquals("root.somequeue", apps.get(appId).getQueueName());
+    appId = createSchedulingRequest(1024, "default", "otheruser");
+    assertEquals("root.default", apps.get(appId).getQueueName());
+  }
 
   @Test
   public void testFairShareWithMinAlloc() throws Exception {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Thu Nov 14 23:56:56 2013
@@ -101,6 +101,16 @@ Hadoop MapReduce Next Generation - Fair 
   Fair Scheduler. Among them, is the use of a custom policies governing 
   priority “boosting” over  certain apps. 
 
+* {Automatically placing applications in queues}
+
+  The Fair Scheduler allows administrators to configure policies that
+  automatically place submitted applications into appropriate queues. Placement
+  can depend on the user and groups of the submitter and the requested queue
+  passed by the application. A policy consists of a set of rules that are applied
+  sequentially to classify an incoming application. Each rule either places the
+  app into a queue, rejects it, or continues on to the next rule. Refer to the
+  allocation file format below for how to configure these policies.
+
 * {Installation}
 
   To use the Fair Scheduler first assign the appropriate scheduler class in 
@@ -138,7 +148,8 @@ Properties that can be placed in yarn-si
     * Whether to use the username associated with the allocation as the default 
       queue name, in the event that a queue name is not specified. If this is set 
       to "false" or unset, all jobs have a shared default queue, named "default".
-      Defaults to true.
+      Defaults to true.  If a queue placement policy is given in the allocations
+      file, this property is ignored.
 
  * <<<yarn.scheduler.fair.preemption>>>
 
@@ -180,6 +191,16 @@ Properties that can be placed in yarn-si
       opportunities to pass up. The default value of -1.0 means don't pass up any
       scheduling opportunities.
 
+ * <<<yarn.scheduler.fair.allow-undeclared-pools>>>
+
+    * If this is true, new queues can be created at application submission time,
+      whether because they are specified as the application's queue by the
+      submitter or because they are placed there by the user-as-default-queue
+      property. If this is false, any time an app would be placed in a queue that
+      is not specified in the allocations file, it is placed in the "default" queue
+      instead. Defaults to true. If a queue placement policy is given in the
+      allocations file, this property is ignored.
+
 Allocation file format
 
   The allocation file must be in XML format. The format contains five types of
@@ -248,25 +269,29 @@ Allocation file format
    policy for queues; overriden by the schedulingPolicy element in each queue
    if specified. Defaults to "fair".
 
-  An example allocation file is given here:
+ * <<A queuePlacementPolicy element>>, which contains a list of rule elements
+   that tell the scheduler how to place incoming apps into queues. Rules
+   are applied in the order that they are listed. Rules may take arguments. All
+   rules accept the "create" argument, which indicates whether the rule can create
+   a new queue. "Create" defaults to true; if set to false and the rule would
+   place the app in a queue that is not configured in the allocations file, we
+   continue on to the next rule. The last rule must be one that can never issue a
+   continue.  Valid rules are:
+
+     * specified: the app is placed into the queue it requested.  If the app
+       requested no queue, i.e. it specified "default", we continue.
+
+     * user: the app is placed into a queue with the name of the user who
+       submitted it.
 
-Queue Access Control Lists (ACLs)
+     * primaryGroup: the app is placed into a queue with the name of the
+       primary group of the user who submitted it.
 
-  Queue Access Control Lists (ACLs) allow administrators to control who may
-  take actions on particular queues. They are configured with the aclSubmitApps
-  and aclAdministerApps properties, which can be set per queue. Currently the
-  only supported administrative action is killing an application. Anybody who
-  may administer a queue may also submit applications to it. These properties
-  take values in a format like "user1,user2 group1,group2" or " group1,group2".
-  An action on a queue will be permitted if its user or group is in the ACL of
-  that queue or in the ACL of any of that queue's ancestors. So if queue2
-  is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
-  ACL, then both users may submit to queue2.
-  
-  The root queue's ACLs are "*" by default which, because ACLs are passed down,
-  means that everybody may submit to and kill applications from every queue.
-  To start restricting access, change the root queue's ACLs to something other
-  than "*". 
+     * default: the app is placed into the queue named "default".
+
+     * reject: the app is rejected.
+
+  An example allocation file is given here:
 
 ---
 <?xml version="1.0"?>
@@ -282,14 +307,41 @@ Queue Access Control Lists (ACLs)
       <minResources>5000 mb,0vcores</minResources>
     </queue>
   </queue>
+  
   <user name="sample_user">
     <maxRunningApps>30</maxRunningApps>
   </user>
   <userMaxAppsDefault>5</userMaxAppsDefault>
+  
+  <queuePlacementPolicy>
+    <specified />
+    <primarygroup create="false" />
+    <default />
+  </queuePlacementPolicy>
 </allocations>
 ---
 
   Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
+
+
+Queue Access Control Lists (ACLs)
+
+  Queue Access Control Lists (ACLs) allow administrators to control who may
+  take actions on particular queues. They are configured with the aclSubmitApps
+  and aclAdministerApps properties, which can be set per queue. Currently the
+  only supported administrative action is killing an application. Anybody who
+  may administer a queue may also submit applications to it. These properties
+  take values in a format like "user1,user2 group1,group2" or " group1,group2".
+  An action on a queue will be permitted if its user or group is in the ACL of
+  that queue or in the ACL of any of that queue's ancestors. So if queue2
+  is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
+  ACL, then both users may submit to queue2.
+  
+  The root queue's ACLs are "*" by default which, because ACLs are passed down,
+  means that everybody may submit to and kill applications from every queue.
+  To start restricting access, change the root queue's ACLs to something other
+  than "*". 
+
   
 * {Administration}
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/pom.xml Thu Nov 14 23:56:56 2013
@@ -119,6 +119,7 @@
     <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-grizzly2</artifactId>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>com.sun.jersey</groupId>