You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:50 UTC

[29/50] [abbrv] hadoop git commit: YARN-5531. UnmanagedAM pool manager for federating application across clusters. (Botong Huang via Subru).

YARN-5531. UnmanagedAM pool manager for federating application across clusters. (Botong Huang via Subru).

(cherry picked from commit 73bb2102ce4b82b3a3bed91319f7c8f067ddc3e8)
(cherry picked from commit 859aa1f9d621d07693825e610bdc0149f7a2770a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9476d86c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9476d86c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9476d86c

Branch: refs/heads/branch-2
Commit: 9476d86ce869b51fc7524ae58dd53862bc2d7d72
Parents: 7f00f93
Author: Subru Krishnan <su...@apache.org>
Authored: Fri May 26 16:23:38 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:47:43 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/yarn/util/AsyncCallback.java  |  35 ++
 .../failover/FederationProxyProviderUtil.java   | 114 ++--
 .../yarn/server/uam/UnmanagedAMPoolManager.java | 311 ++++++++++
 .../server/uam/UnmanagedApplicationManager.java | 607 +++++++++++++++++++
 .../hadoop/yarn/server/uam/package-info.java    |  18 +
 .../yarn/server/utils/AMRMClientUtils.java      | 189 ++++++
 .../server/utils/YarnServerSecurityUtils.java   |  41 +-
 .../yarn/server/MockResourceManagerFacade.java  |  10 +-
 .../uam/TestUnmanagedApplicationManager.java    | 335 ++++++++++
 .../amrmproxy/DefaultRequestInterceptor.java    |  30 +-
 .../ApplicationMasterService.java               |  12 +-
 .../TestApplicationMasterLauncher.java          |   6 +-
 12 files changed, 1590 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java
new file mode 100644
index 0000000..b4f75c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util;
+
+/**
+ * Generic interface that can be used for calling back when a corresponding
+ * asynchronous operation completes.
+ *
+ * @param <T> parameter type for the callback
+ */
+public interface AsyncCallback<T> {
+  /**
+   * This method is called back when the corresponding asynchronous operation
+   * completes.
+   *
+   * @param response response of the callback
+   */
+  void callback(T response);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
index 18f1338..3931f2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -19,22 +19,20 @@
 package org.apache.hadoop.yarn.server.federation.failover;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,10 +49,15 @@ public final class FederationProxyProviderUtil {
   public static final Logger LOG =
       LoggerFactory.getLogger(FederationProxyProviderUtil.class);
 
+  // Disable constructor
+  private FederationProxyProviderUtil() {
+  }
+
   /**
-   * Create a proxy for the specified protocol. For non-HA, this is a direct
-   * connection to the ResourceManager address. When HA is enabled, the proxy
-   * handles the failover between the ResourceManagers as well.
+   * Create a proxy for the specified protocol in the context of Federation. For
+   * non-HA, this is a direct connection to the ResourceManager address. When HA
+   * is enabled, the proxy handles the failover between the ResourceManagers as
+   * well.
    *
    * @param configuration Configuration to generate {@link ClientRMProxy}
    * @param protocol Protocol for the proxy
@@ -67,15 +70,16 @@ public final class FederationProxyProviderUtil {
   @Public
   @Unstable
   public static <T> T createRMProxy(Configuration configuration,
-      final Class<T> protocol, SubClusterId subClusterId,
-      UserGroupInformation user) throws IOException {
+      Class<T> protocol, SubClusterId subClusterId, UserGroupInformation user)
+      throws IOException {
     return createRMProxy(configuration, protocol, subClusterId, user, null);
   }
 
   /**
-   * Create a proxy for the specified protocol. For non-HA, this is a direct
-   * connection to the ResourceManager address. When HA is enabled, the proxy
-   * handles the failover between the ResourceManagers as well.
+   * Create a proxy for the specified protocol in the context of Federation. For
+   * non-HA, this is a direct connection to the ResourceManager address. When HA
+   * is enabled, the proxy handles the failover between the ResourceManagers as
+   * well.
    *
    * @param configuration Configuration to generate {@link ClientRMProxy}
    * @param protocol Protocol for the proxy
@@ -88,65 +92,35 @@ public final class FederationProxyProviderUtil {
    */
   @Public
   @Unstable
-  @SuppressWarnings("unchecked")
-  public static <T> T createRMProxy(final Configuration configuration,
+  public static <T> T createRMProxy(Configuration configuration,
       final Class<T> protocol, SubClusterId subClusterId,
-      UserGroupInformation user, final Token token) throws IOException {
-    try {
-      final YarnConfiguration conf = new YarnConfiguration(configuration);
-      updateConf(conf, subClusterId);
-      if (token != null) {
-        LOG.info(
-            "Creating RMProxy with a token: {} to subcluster: {}"
-                + " for protocol: {}",
-            token, subClusterId, protocol.getSimpleName());
-        user.addToken(token);
-        setAuthModeInConf(conf);
-      } else {
-        LOG.info("Creating RMProxy without a token to subcluster: {}"
-            + " for protocol: {}", subClusterId, protocol.getSimpleName());
-      }
-      final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
-        @Override
-        public T run() throws Exception {
-          return ClientRMProxy.createRMProxy(conf, protocol);
-        }
-      });
-
-      return proxyConnection;
-    } catch (IOException e) {
-      String message =
-          "Error while creating of RM application master service proxy for"
-              + " appAttemptId: " + user;
-      LOG.info(message);
-      throw new YarnRuntimeException(message, e);
-    } catch (InterruptedException e) {
-      throw new YarnRuntimeException(e);
-    }
+      UserGroupInformation user, Token<? extends TokenIdentifier> token)
+      throws IOException {
+    final YarnConfiguration config = new YarnConfiguration(configuration);
+    updateConfForFederation(config, subClusterId.getId());
+    return AMRMClientUtils.createRMProxy(config, protocol, user, token);
   }
 
-  private static void setAuthModeInConf(Configuration conf) {
-    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        SaslRpcServer.AuthMethod.TOKEN.toString());
-  }
-
-  // updating the conf with the refreshed RM addresses as proxy creations
-  // are based out of conf
-  private static void updateConf(Configuration conf,
-      SubClusterId subClusterId) {
-    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
-    // In a Federation setting, we will connect to not just the local cluster RM
-    // but also multiple external RMs. The membership information of all the RMs
-    // that are currently
-    // participating in Federation is available in the central
-    // FederationStateStore.
-    // So we will:
-    // 1. obtain the RM service addresses from FederationStateStore using the
-    // FederationRMFailoverProxyProvider.
-    // 2. disable traditional HA as that depends on local configuration lookup
-    // for RMs using indexes.
-    // 3. we will enable federation failover IF traditional HA is enabled so
-    // that the appropriate failover RetryPolicy is initialized.
+  /**
+   * Updating the conf with Federation as long as certain subclusterId.
+   *
+   * @param conf configuration
+   * @param subClusterId subclusterId for the conf
+   */
+  public static void updateConfForFederation(Configuration conf,
+      String subClusterId) {
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId);
+    /*
+     * In a Federation setting, we will connect to not just the local cluster RM
+     * but also multiple external RMs. The membership information of all the RMs
+     * that are currently participating in Federation is available in the
+     * central FederationStateStore. So we will: 1. obtain the RM service
+     * addresses from FederationStateStore using the
+     * FederationRMFailoverProxyProvider. 2. disable traditional HA as that
+     * depends on local configuration lookup for RMs using indexes. 3. we will
+     * enable federation failover IF traditional HA is enabled so that the
+     * appropriate failover RetryPolicy is initialized.
+     */
     conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
     conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
         FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
@@ -156,8 +130,4 @@ public final class FederationProxyProviderUtil {
     }
   }
 
-  // disable instantiation
-  private FederationProxyProviderUtil() {
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
new file mode 100644
index 0000000..08aee77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A service that manages a pool of UAM managers in
+ * {@link UnmanagedApplicationManager}.
+ */
+@Public
+@Unstable
+public class UnmanagedAMPoolManager extends AbstractService {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(UnmanagedAMPoolManager.class);
+
+  // Map from uamId to UAM instances
+  private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap;
+
+  private Map<String, ApplicationAttemptId> attemptIdMap;
+
+  private ExecutorService threadpool;
+
+  public UnmanagedAMPoolManager(ExecutorService threadpool) {
+    super(UnmanagedAMPoolManager.class.getName());
+    this.threadpool = threadpool;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    if (this.threadpool == null) {
+      this.threadpool = Executors.newCachedThreadPool();
+    }
+    this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
+    this.attemptIdMap = new ConcurrentHashMap<>();
+    super.serviceStart();
+  }
+
+  /**
+   * Normally we should finish all applications before stop. If there are still
+   * UAMs running, force kill all of them. Do parallel kill because of
+   * performance reasons.
+   *
+   * TODO: move waiting for the kill to finish into a separate thread, without
+   * blocking the serviceStop.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    ExecutorCompletionService<KillApplicationResponse> completionService =
+        new ExecutorCompletionService<>(this.threadpool);
+    if (this.unmanagedAppMasterMap.isEmpty()) {
+      return;
+    }
+
+    // Save a local copy of the key set so that it won't change with the map
+    Set<String> addressList =
+        new HashSet<>(this.unmanagedAppMasterMap.keySet());
+    LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
+        addressList.size());
+
+    for (final String uamId : addressList) {
+      completionService.submit(new Callable<KillApplicationResponse>() {
+        @Override
+        public KillApplicationResponse call() throws Exception {
+          try {
+            LOG.info("Force-killing UAM id " + uamId + " for application "
+                + attemptIdMap.get(uamId));
+            return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
+          } catch (Exception e) {
+            LOG.error("Failed to kill unmanaged application master", e);
+            return null;
+          }
+        }
+      });
+    }
+
+    for (int i = 0; i < addressList.size(); ++i) {
+      try {
+        Future<KillApplicationResponse> future = completionService.take();
+        future.get();
+      } catch (Exception e) {
+        LOG.error("Failed to kill unmanaged application master", e);
+      }
+    }
+    this.attemptIdMap.clear();
+    super.serviceStop();
+  }
+
+  /**
+   * Create a new UAM and register the application, without specifying uamId and
+   * appId. We will ask for an appId from RM and use it as the uamId.
+   *
+   * @param registerRequest RegisterApplicationMasterRequest
+   * @param conf configuration for this UAM
+   * @param queueName queue of the application
+   * @param submitter submitter name of the UAM
+   * @param appNameSuffix application name suffix for the UAM
+   * @return uamId for the UAM
+   * @throws YarnException if registerApplicationMaster fails
+   * @throws IOException if registerApplicationMaster fails
+   */
+  public String createAndRegisterNewUAM(
+      RegisterApplicationMasterRequest registerRequest, Configuration conf,
+      String queueName, String submitter, String appNameSuffix)
+      throws YarnException, IOException {
+    ApplicationId appId = null;
+    ApplicationClientProtocol rmClient;
+    try {
+      UserGroupInformation appSubmitter =
+          UserGroupInformation.createRemoteUser(submitter);
+      rmClient = AMRMClientUtils.createRMProxy(conf,
+          ApplicationClientProtocol.class, appSubmitter, null);
+
+      // Get a new appId from RM
+      GetNewApplicationResponse response =
+          rmClient.getNewApplication(GetNewApplicationRequest.newInstance());
+      if (response == null) {
+        throw new YarnException("getNewApplication got null response");
+      }
+      appId = response.getApplicationId();
+      LOG.info("Received new application ID {} from RM", appId);
+    } finally {
+      rmClient = null;
+    }
+
+    createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
+        queueName, submitter, appNameSuffix);
+    return appId.toString();
+  }
+
+  /**
+   * Create a new UAM and register the application, using the provided uamId and
+   * appId.
+   *
+   * @param uamId identifier for the UAM
+   * @param registerRequest RegisterApplicationMasterRequest
+   * @param conf configuration for this UAM
+   * @param appId application id for the UAM
+   * @param queueName queue of the application
+   * @param submitter submitter name of the UAM
+   * @param appNameSuffix application name suffix for the UAM
+   * @return RegisterApplicationMasterResponse
+   * @throws YarnException if registerApplicationMaster fails
+   * @throws IOException if registerApplicationMaster fails
+   */
+  public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId,
+      RegisterApplicationMasterRequest registerRequest, Configuration conf,
+      ApplicationId appId, String queueName, String submitter,
+      String appNameSuffix) throws YarnException, IOException {
+
+    if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " already exists");
+    }
+    UnmanagedApplicationManager uam =
+        createUAM(conf, appId, queueName, submitter, appNameSuffix);
+    // Put the UAM into map first before initializing it to avoid additional UAM
+    // for the same uamId being created concurrently
+    this.unmanagedAppMasterMap.put(uamId, uam);
+
+    RegisterApplicationMasterResponse response = null;
+    try {
+      LOG.info("Creating and registering UAM id {} for application {}", uamId,
+          appId);
+      response = uam.createAndRegisterApplicationMaster(registerRequest);
+    } catch (Exception e) {
+      // Add the map earlier and remove here if register failed because we want
+      // to make sure there is only one uam instance per uamId at any given time
+      this.unmanagedAppMasterMap.remove(uamId);
+      throw e;
+    }
+
+    this.attemptIdMap.put(uamId, uam.getAttemptId());
+    return response;
+  }
+
+  /**
+   * Creates the UAM instance. Pull out to make unit test easy.
+   *
+   * @param conf Configuration
+   * @param appId application id
+   * @param queueName queue of the application
+   * @param submitter submitter name of the application
+   * @param appNameSuffix application name suffix
+   * @return the UAM instance
+   */
+  @VisibleForTesting
+  protected UnmanagedApplicationManager createUAM(Configuration conf,
+      ApplicationId appId, String queueName, String submitter,
+      String appNameSuffix) {
+    return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
+        appNameSuffix);
+  }
+
+  /**
+   * AllocateAsync to an UAM.
+   *
+   * @param uamId identifier for the UAM
+   * @param request AllocateRequest
+   * @param callback callback for response
+   * @throws YarnException if allocate fails
+   * @throws IOException if allocate fails
+   */
+  public void allocateAsync(String uamId, AllocateRequest request,
+      AsyncCallback<AllocateResponse> callback)
+      throws YarnException, IOException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    this.unmanagedAppMasterMap.get(uamId).allocateAsync(request, callback);
+  }
+
+  /**
+   * Finish an UAM/application.
+   *
+   * @param uamId identifier for the UAM
+   * @param request FinishApplicationMasterRequest
+   * @return FinishApplicationMasterResponse
+   * @throws YarnException if finishApplicationMaster call fails
+   * @throws IOException if finishApplicationMaster call fails
+   */
+  public FinishApplicationMasterResponse finishApplicationMaster(String uamId,
+      FinishApplicationMasterRequest request)
+      throws YarnException, IOException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    LOG.info("Finishing application for UAM id {} ", uamId);
+    FinishApplicationMasterResponse response =
+        this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
+
+    if (response.getIsUnregistered()) {
+      // Only remove the UAM when the unregister finished
+      this.unmanagedAppMasterMap.remove(uamId);
+      this.attemptIdMap.remove(uamId);
+      LOG.info("UAM id {} is unregistered", uamId);
+    }
+    return response;
+  }
+
+  /**
+   * Get the id of all running UAMs.
+   *
+   * @return uamId set
+   */
+  public Set<String> getAllUAMIds() {
+    // Return a clone of the current id set for concurrency reasons, so that the
+    // returned map won't change with the actual map
+    return new HashSet<String>(this.unmanagedAppMasterMap.keySet());
+  }
+
+  /**
+   * Return whether an UAM exists.
+   *
+   * @param uamId identifier for the UAM
+   * @return UAM exists or not
+   */
+  public boolean hasUAMId(String uamId) {
+    return this.unmanagedAppMasterMap.containsKey(uamId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
new file mode 100644
index 0000000..60a9a27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * UnmanagedApplicationManager is used to register unmanaged application and
+ * negotiate for resources from resource managers. An unmanagedAM is an AM that
+ * is not launched and managed by the RM. Allocate calls are handled
+ * asynchronously using {@link AsyncCallback}.
+ */
+@Public
+@Unstable
+public class UnmanagedApplicationManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(UnmanagedApplicationManager.class);
+  private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
+  private static final String APP_NAME = "UnmanagedAM";
+  private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
+
+  private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
+  private AMRequestHandlerThread handlerThread;
+  private ApplicationMasterProtocol rmProxy;
+  private ApplicationId applicationId;
+  private ApplicationAttemptId attemptId;
+  private String submitter;
+  private String appNameSuffix;
+  private Configuration conf;
+  private String queueName;
+  private UserGroupInformation userUgi;
+  private RegisterApplicationMasterRequest registerRequest;
+  private int lastResponseId;
+  private ApplicationClientProtocol rmClient;
+  private long asyncApiPollIntervalMillis;
+  private RecordFactory recordFactory;
+
+  public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
+      String queueName, String submitter, String appNameSuffix) {
+    Preconditions.checkNotNull(conf, "Configuration cannot be null");
+    Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
+    Preconditions.checkNotNull(submitter, "App submitter cannot be null");
+
+    this.conf = conf;
+    this.applicationId = appId;
+    this.queueName = queueName;
+    this.submitter = submitter;
+    this.appNameSuffix = appNameSuffix;
+    this.handlerThread = new AMRequestHandlerThread();
+    this.requestQueue = new LinkedBlockingQueue<>();
+    this.rmProxy = null;
+    this.registerRequest = null;
+    this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+    this.asyncApiPollIntervalMillis = conf.getLong(
+        YarnConfiguration.
+            YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+        YarnConfiguration.
+            DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+  }
+
+  /**
+   * Registers this {@link UnmanagedApplicationManager} with the resource
+   * manager.
+   *
+   * @param request the register request
+   * @return the register response
+   * @throws YarnException if register fails
+   * @throws IOException if register fails
+   */
+  public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    // This need to be done first in this method, because it is used as an
+    // indication that this method is called (and perhaps blocked due to RM
+    // connection and not finished yet)
+    this.registerRequest = request;
+
+    // attemptId will be available after this call
+    UnmanagedAMIdentifier identifier =
+        initializeUnmanagedAM(this.applicationId);
+
+    try {
+      this.userUgi = UserGroupInformation.createProxyUser(
+          identifier.getAttemptId().toString(),
+          UserGroupInformation.getCurrentUser());
+    } catch (IOException e) {
+      LOG.error("Exception while trying to get current user", e);
+      throw new YarnRuntimeException(e);
+    }
+
+    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
+        this.userUgi, identifier.getToken());
+
+    LOG.info("Registering the Unmanaged application master {}", this.attemptId);
+    RegisterApplicationMasterResponse response =
+        this.rmProxy.registerApplicationMaster(this.registerRequest);
+
+    // Only when register succeed that we start the heartbeat thread
+    this.handlerThread.setUncaughtExceptionHandler(
+        new HeartBeatThreadUncaughtExceptionHandler());
+    this.handlerThread.setDaemon(true);
+    this.handlerThread.start();
+
+    this.lastResponseId = 0;
+    return response;
+  }
+
+  /**
+   * Unregisters from the resource manager and stops the request handler thread.
+   *
+   * @param request the finishApplicationMaster request
+   * @return the response
+   * @throws YarnException if finishAM call fails
+   * @throws IOException if finishAM call fails
+   */
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request)
+      throws YarnException, IOException {
+
+    this.handlerThread.shutdown();
+
+    if (this.rmProxy == null) {
+      if (this.registerRequest != null) {
+        // This is possible if the async registerApplicationMaster is still
+        // blocked and retrying. Return a dummy response in this case.
+        LOG.warn("Unmanaged AM still not successfully launched/registered yet."
+            + " Stopping the UAM client thread anyways.");
+        return FinishApplicationMasterResponse.newInstance(false);
+      } else {
+        throw new YarnException("finishApplicationMaster should not "
+            + "be called before createAndRegister");
+      }
+    }
+    return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
+        this.registerRequest, this.attemptId);
+  }
+
+  /**
+   * Force kill the UAM.
+   *
+   * @return kill response
+   * @throws IOException if fails to create rmProxy
+   * @throws YarnException if force kill fails
+   */
+  public KillApplicationResponse forceKillApplication()
+      throws IOException, YarnException {
+    KillApplicationRequest request =
+        KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
+
+    this.handlerThread.shutdown();
+
+    if (this.rmClient == null) {
+      this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
+          UserGroupInformation.createRemoteUser(this.submitter), null);
+    }
+    return this.rmClient.forceKillApplication(request);
+  }
+
+  /**
+   * Sends the specified heart beat request to the resource manager and invokes
+   * the callback asynchronously with the response.
+   *
+   * @param request the allocate request
+   * @param callback the callback method for the request
+   * @throws YarnException if registerAM is not called yet
+   */
+  public void allocateAsync(AllocateRequest request,
+      AsyncCallback<AllocateResponse> callback) throws YarnException {
+    try {
+      this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
+    } catch (InterruptedException ex) {
+      // Should not happen as we have MAX_INT queue length
+      LOG.debug("Interrupted while waiting to put on response queue", ex);
+    }
+    // Two possible cases why the UAM is not successfully registered yet:
+    // 1. registerApplicationMaster is not called at all. Should throw here.
+    // 2. registerApplicationMaster is called but hasn't successfully returned.
+    //
+    // In case 2, we have already save the allocate request above, so if the
+    // registration succeed later, no request is lost.
+    if (this.rmProxy == null) {
+      if (this.registerRequest != null) {
+        LOG.info("Unmanaged AM still not successfully launched/registered yet."
+            + " Saving the allocate request and send later.");
+      } else {
+        throw new YarnException(
+            "AllocateAsync should not be called before createAndRegister");
+      }
+    }
+  }
+
+  /**
+   * Returns the application attempt id of the UAM.
+   *
+   * @return attempt id of the UAM
+   */
+  public ApplicationAttemptId getAttemptId() {
+    return this.attemptId;
+  }
+
+  /**
+   * Returns RM proxy for the specified protocol type. Unit test cases can
+   * override this method and return mock proxy instances.
+   *
+   * @param protocol protocal of the proxy
+   * @param config configuration
+   * @param user ugi for the proxy connection
+   * @param token token for the connection
+   * @param <T> type of the proxy
+   * @return the proxy instance
+   * @throws IOException if fails to create the proxy
+   */
+  protected <T> T createRMProxy(Class<T> protocol, Configuration config,
+      UserGroupInformation user, Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    return AMRMClientUtils.createRMProxy(config, protocol, user, token);
+  }
+
+  /**
+   * Launch and initialize an unmanaged AM. First, it creates a new application
+   * on the RM and negotiates a new attempt id. Then it waits for the RM
+   * application attempt state to reach YarnApplicationAttemptState.LAUNCHED
+   * after which it returns the AM-RM token and the attemptId.
+   *
+   * @param appId application id
+   * @return the UAM identifier
+   * @throws IOException if initialize fails
+   * @throws YarnException if initialize fails
+   */
+  protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
+      throws IOException, YarnException {
+    try {
+      UserGroupInformation appSubmitter =
+          UserGroupInformation.createRemoteUser(this.submitter);
+      this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
+          appSubmitter, null);
+
+      // Submit the application
+      submitUnmanagedApp(appId);
+
+      // Monitor the application attempt to wait for launch state
+      ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
+          EnumSet.of(YarnApplicationState.ACCEPTED,
+              YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
+              YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
+          YarnApplicationAttemptState.LAUNCHED);
+      this.attemptId = attemptReport.getApplicationAttemptId();
+      return getUAMIdentifier();
+    } finally {
+      this.rmClient = null;
+    }
+  }
+
+  private void submitUnmanagedApp(ApplicationId appId)
+      throws YarnException, IOException {
+    SubmitApplicationRequest submitRequest =
+        this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
+
+    ApplicationSubmissionContext context = this.recordFactory
+        .newRecordInstance(ApplicationSubmissionContext.class);
+
+    context.setApplicationId(appId);
+    context.setApplicationName(APP_NAME + "-" + appNameSuffix);
+    if (StringUtils.isBlank(this.queueName)) {
+      context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG,
+          YarnConfiguration.DEFAULT_QUEUE_NAME));
+    } else {
+      context.setQueue(this.queueName);
+    }
+
+    ContainerLaunchContext amContainer =
+        this.recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    context.setResource(resource);
+    context.setAMContainerSpec(amContainer);
+    submitRequest.setApplicationSubmissionContext(context);
+
+    context.setUnmanagedAM(true);
+
+    LOG.info("Submitting unmanaged application {}", appId);
+    this.rmClient.submitApplication(submitRequest);
+  }
+
+  /**
+   * Monitor the submitted application and attempt until it reaches certain
+   * states.
+   *
+   * @param appId Application Id of application to be monitored
+   * @param appStates acceptable application state
+   * @param attemptState acceptable application attempt state
+   * @return the application report
+   * @throws YarnException if getApplicationReport fails
+   * @throws IOException if getApplicationReport fails
+   */
+  private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId,
+      Set<YarnApplicationState> appStates,
+      YarnApplicationAttemptState attemptState)
+      throws YarnException, IOException {
+
+    long startTime = System.currentTimeMillis();
+    ApplicationAttemptId appAttemptId = null;
+    while (true) {
+      if (appAttemptId == null) {
+        // Get application report for the appId we are interested in
+        ApplicationReport report = getApplicationReport(appId);
+        YarnApplicationState state = report.getYarnApplicationState();
+        if (appStates.contains(state)) {
+          if (state != YarnApplicationState.ACCEPTED) {
+            throw new YarnRuntimeException(
+                "Received non-accepted application state: " + state
+                    + ". Application " + appId + " not the first attempt?");
+          }
+          appAttemptId =
+              getApplicationReport(appId).getCurrentApplicationAttemptId();
+        } else {
+          LOG.info("Current application state of {} is {}, will retry later.",
+              appId, state);
+        }
+      }
+
+      if (appAttemptId != null) {
+        GetApplicationAttemptReportRequest req = this.recordFactory
+            .newRecordInstance(GetApplicationAttemptReportRequest.class);
+        req.setApplicationAttemptId(appAttemptId);
+        ApplicationAttemptReport attemptReport = this.rmClient
+            .getApplicationAttemptReport(req).getApplicationAttemptReport();
+        if (attemptState
+            .equals(attemptReport.getYarnApplicationAttemptState())) {
+          return attemptReport;
+        }
+        LOG.info("Current attempt state of " + appAttemptId + " is "
+            + attemptReport.getYarnApplicationAttemptState()
+            + ", waiting for current attempt to reach " + attemptState);
+      }
+
+      try {
+        Thread.sleep(this.asyncApiPollIntervalMillis);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for current attempt of " + appId
+            + " to reach " + attemptState);
+      }
+
+      if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
+        throw new RuntimeException("Timeout for waiting current attempt of "
+            + appId + " to reach " + attemptState);
+      }
+    }
+  }
+
+  /**
+   * Gets the identifier of the unmanaged AM.
+   *
+   * @return the identifier of the unmanaged AM.
+   * @throws IOException if getApplicationReport fails
+   * @throws YarnException if getApplicationReport fails
+   */
+  protected UnmanagedAMIdentifier getUAMIdentifier()
+      throws IOException, YarnException {
+    Token<AMRMTokenIdentifier> token = null;
+    org.apache.hadoop.yarn.api.records.Token amrmToken =
+        getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
+    if (amrmToken != null) {
+      token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
+    } else {
+      LOG.warn(
+          "AMRMToken not found in the application report for application: {}",
+          this.attemptId.getApplicationId());
+    }
+    return new UnmanagedAMIdentifier(this.attemptId, token);
+  }
+
+  private ApplicationReport getApplicationReport(ApplicationId appId)
+      throws YarnException, IOException {
+    GetApplicationReportRequest request =
+        this.recordFactory.newRecordInstance(GetApplicationReportRequest.class);
+    request.setApplicationId(appId);
+    return this.rmClient.getApplicationReport(request).getApplicationReport();
+  }
+
+  /**
+   * Data structure that encapsulates the application attempt identifier and the
+   * AMRMTokenIdentifier. Make it public because clients with HA need it.
+   */
+  public static class UnmanagedAMIdentifier {
+    private ApplicationAttemptId attemptId;
+    private Token<AMRMTokenIdentifier> token;
+
+    public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
+        Token<AMRMTokenIdentifier> token) {
+      this.attemptId = attemptId;
+      this.token = token;
+    }
+
+    public ApplicationAttemptId getAttemptId() {
+      return this.attemptId;
+    }
+
+    public Token<AMRMTokenIdentifier> getToken() {
+      return this.token;
+    }
+  }
+
+  /**
+   * Data structure that encapsulates AllocateRequest and AsyncCallback
+   * instance.
+   */
+  public static class AsyncAllocateRequestInfo {
+    private AllocateRequest request;
+    private AsyncCallback<AllocateResponse> callback;
+
+    public AsyncAllocateRequestInfo(AllocateRequest request,
+        AsyncCallback<AllocateResponse> callback) {
+      Preconditions.checkArgument(request != null,
+          "AllocateRequest cannot be null");
+      Preconditions.checkArgument(callback != null, "Callback cannot be null");
+
+      this.request = request;
+      this.callback = callback;
+    }
+
+    public AsyncCallback<AllocateResponse> getCallback() {
+      return this.callback;
+    }
+
+    public AllocateRequest getRequest() {
+      return this.request;
+    }
+  }
+
+  @VisibleForTesting
+  public int getRequestQueueSize() {
+    return this.requestQueue.size();
+  }
+
+  /**
+   * Extends Thread and provides an implementation that is used for processing
+   * the AM heart beat request asynchronously and sending back the response
+   * using the callback method registered with the system.
+   */
+  public class AMRequestHandlerThread extends Thread {
+
+    // Indication flag for the thread to keep running
+    private volatile boolean keepRunning;
+
+    public AMRequestHandlerThread() {
+      super("UnmanagedApplicationManager Heartbeat Handler Thread");
+      this.keepRunning = true;
+    }
+
+    /**
+     * Shutdown the thread.
+     */
+    public void shutdown() {
+      this.keepRunning = false;
+      this.interrupt();
+    }
+
+    @Override
+    public void run() {
+      while (keepRunning) {
+        AsyncAllocateRequestInfo requestInfo;
+        try {
+          requestInfo = requestQueue.take();
+          if (requestInfo == null) {
+            throw new YarnException(
+                "Null requestInfo taken from request queue");
+          }
+          if (!keepRunning) {
+            break;
+          }
+
+          // change the response id before forwarding the allocate request as we
+          // could have different values for each UAM
+          AllocateRequest request = requestInfo.getRequest();
+          if (request == null) {
+            throw new YarnException("Null allocateRequest from requestInfo");
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+                + ((request.getAskList() == null) ? " empty"
+                    : request.getAskList().size()));
+          }
+
+          request.setResponseId(lastResponseId);
+          AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
+              request, rmProxy, registerRequest, attemptId);
+          if (response == null) {
+            throw new YarnException("Null allocateResponse from allocate");
+          }
+
+          lastResponseId = response.getResponseId();
+          // update token if RM has reissued/renewed
+          if (response.getAMRMToken() != null) {
+            LOG.debug("Received new AMRMToken");
+            YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
+                userUgi, conf);
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+                + ((response.getAllocatedContainers() == null) ? " empty"
+                    : response.getAllocatedContainers().size()));
+          }
+
+          if (requestInfo.getCallback() == null) {
+            throw new YarnException("Null callback from requestInfo");
+          }
+          requestInfo.getCallback().callback(response);
+        } catch (InterruptedException ex) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Interrupted while waiting for queue", ex);
+          }
+        } catch (IOException ex) {
+          LOG.warn(
+              "IO Error occurred while processing heart beat for " + attemptId,
+              ex);
+        } catch (Throwable ex) {
+          LOG.warn(
+              "Error occurred while processing heart beat for " + attemptId,
+              ex);
+        }
+      }
+
+      LOG.info("UnmanagedApplicationManager has been stopped for {}. "
+          + "AMRequestHandlerThread thread is exiting", attemptId);
+    }
+  }
+
+  /**
+   * Uncaught exception handler for the background heartbeat thread.
+   */
+  protected class HeartBeatThreadUncaughtExceptionHandler
+      implements UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.error("Heartbeat thread {} for application attempt {} crashed!",
+          t.getName(), attemptId, e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java
new file mode 100644
index 0000000..0e78094
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
new file mode 100644
index 0000000..7993bd8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for AMRMClient.
+ */
+@Private
+public final class AMRMClientUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientUtils.class);
+
+  public static final String APP_ALREADY_REGISTERED_MESSAGE =
+      "Application Master is already registered : ";
+
+  private AMRMClientUtils() {
+  }
+
+  /**
+   * Handle ApplicationNotRegistered exception and re-register.
+   *
+   * @param attemptId app attemptId
+   * @param rmProxy RM proxy instance
+   * @param registerRequest the AM re-register request
+   * @throws YarnException if re-register fails
+   */
+  public static void handleNotRegisteredExceptionAndReRegister(
+      ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest) throws YarnException {
+    LOG.info("App attempt {} not registered, most likely due to RM failover. "
+        + " Trying to re-register.", attemptId);
+    try {
+      rmProxy.registerApplicationMaster(registerRequest);
+    } catch (Exception e) {
+      if (e instanceof InvalidApplicationMasterRequestException
+          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
+        LOG.info("Concurrent thread successfully registered, moving on.");
+      } else {
+        LOG.error("Error trying to re-register AM", e);
+        throw new YarnException(e);
+      }
+    }
+  }
+
+  /**
+   * Helper method for client calling ApplicationMasterProtocol.allocate that
+   * handles re-register if RM fails over.
+   *
+   * @param request allocate request
+   * @param rmProxy RM proxy
+   * @param registerRequest the register request for re-register
+   * @param attemptId application attempt id
+   * @return allocate response
+   * @throws YarnException if RM call fails
+   * @throws IOException if RM call fails
+   */
+  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
+      ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest,
+      ApplicationAttemptId attemptId) throws YarnException, IOException {
+    try {
+      return rmProxy.allocate(request);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+          registerRequest);
+      // reset responseId after re-register
+      request.setResponseId(0);
+      // retry allocate
+      return allocateWithReRegister(request, rmProxy, registerRequest,
+          attemptId);
+    }
+  }
+
+  /**
+   * Helper method for client calling
+   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
+   * if RM fails over.
+   *
+   * @param request finishApplicationMaster request
+   * @param rmProxy RM proxy
+   * @param registerRequest the register request for re-register
+   * @param attemptId application attempt id
+   * @return finishApplicationMaster response
+   * @throws YarnException if RM call fails
+   * @throws IOException if RM call fails
+   */
+  public static FinishApplicationMasterResponse finishAMWithReRegister(
+      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
+      RegisterApplicationMasterRequest registerRequest,
+      ApplicationAttemptId attemptId) throws YarnException, IOException {
+    try {
+      return rmProxy.finishApplicationMaster(request);
+    } catch (ApplicationMasterNotRegisteredException ex) {
+      handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+          registerRequest);
+      // retry finishAM after re-register
+      return finishAMWithReRegister(request, rmProxy, registerRequest,
+          attemptId);
+    }
+  }
+
+  /**
+   * Create a proxy for the specified protocol.
+   *
+   * @param configuration Configuration to generate {@link ClientRMProxy}
+   * @param protocol Protocol for the proxy
+   * @param user the user on whose behalf the proxy is being created
+   * @param token the auth token to use for connection
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException on failure
+   */
+  @Public
+  @Unstable
+  public static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol, UserGroupInformation user,
+      final Token<? extends TokenIdentifier> token) throws IOException {
+    try {
+      String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID,
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+      LOG.info("Creating RMProxy to RM {} for protocol {} for user {}",
+          rmClusterId, protocol.getSimpleName(), user);
+      if (token != null) {
+        token.setService(ClientRMProxy.getAMRMTokenService(configuration));
+        user.addToken(token);
+        setAuthModeInConf(configuration);
+      }
+      final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
+        @Override
+        public T run() throws Exception {
+          return ClientRMProxy.createRMProxy(configuration, protocol);
+        }
+      });
+      return proxyConnection;
+
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  private static void setAuthModeInConf(Configuration conf) {
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
index 9af556e..e61798d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
@@ -23,13 +23,16 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -42,8 +45,8 @@ import org.slf4j.LoggerFactory;
  */
 @Private
 public final class YarnServerSecurityUtils {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(YarnServerSecurityUtils.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(YarnServerSecurityUtils.class);
 
   private YarnServerSecurityUtils() {
   }
@@ -55,8 +58,7 @@ public final class YarnServerSecurityUtils {
    * @return the AMRMTokenIdentifier instance for the current user
    * @throws YarnException
    */
-  public static AMRMTokenIdentifier authorizeRequest()
-      throws YarnException {
+  public static AMRMTokenIdentifier authorizeRequest() throws YarnException {
 
     UserGroupInformation remoteUgi;
     try {
@@ -82,9 +84,8 @@ public final class YarnServerSecurityUtils {
       }
     } catch (IOException e) {
       tokenFound = false;
-      message =
-          "Got exception while looking for AMRMToken for user "
-              + remoteUgi.getUserName();
+      message = "Got exception while looking for AMRMToken for user "
+          + remoteUgi.getUserName();
     }
 
     if (!tokenFound) {
@@ -113,8 +114,29 @@ public final class YarnServerSecurityUtils {
   }
 
   /**
+   * Update the new AMRMToken into the ugi used for RM proxy.
+   *
+   * @param token the new AMRMToken sent by RM
+   * @param user ugi used for RM proxy
+   * @param conf configuration
+   */
+  public static void updateAMRMToken(
+      org.apache.hadoop.yarn.api.records.Token token, UserGroupInformation user,
+      Configuration conf) {
+    Token<AMRMTokenIdentifier> amrmToken = new Token<AMRMTokenIdentifier>(
+        token.getIdentifier().array(), token.getPassword().array(),
+        new Text(token.getKind()), new Text(token.getService()));
+    // Preserve the token service sent by the RM when adding the token
+    // to ensure we replace the previous token setup by the RM.
+    // Afterwards we can update the service address for the RPC layer.
+    user.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf));
+  }
+
+  /**
    * Parses the container launch context and returns a Credential instance that
-   * contains all the tokens from the launch context. 
+   * contains all the tokens from the launch context.
+   *
    * @param launchContext
    * @return the credential instance
    * @throws IOException
@@ -130,8 +152,7 @@ public final class YarnServerSecurityUtils {
       buf.reset(tokens);
       credentials.readTokenStorageStream(buf);
       if (LOG.isDebugEnabled()) {
-        for (Token<? extends TokenIdentifier> tk : credentials
-            .getAllTokens()) {
+        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
           LOG.debug(tk.getService() + " = " + tk.toString());
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index bda41d4..c4a4002 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -116,7 +116,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
-import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
 
 /**
  * Mock Resource Manager facade implementation that exposes all the methods
@@ -165,8 +168,9 @@ public class MockResourceManagerFacade
     Log.info("Registering application attempt: " + amrmToken);
 
     synchronized (applicationContainerIdMap) {
-      Assert.assertFalse("The application id is already registered: "
-          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      Assert.assertFalse(
+          "The application id is already registered: " + amrmToken,
+          applicationContainerIdMap.containsKey(amrmToken));
       // Keep track of the containers that are returned to this application
       applicationContainerIdMap.put(amrmToken,
           new ArrayList<ContainerId>());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
new file mode 100644
index 0000000..9159cf7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.uam;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for UnmanagedApplicationManager.
+ */
+public class TestUnmanagedApplicationManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestUnmanagedApplicationManager.class);
+
+  private TestableUnmanagedApplicationManager uam;
+  private Configuration conf = new YarnConfiguration();
+  private CountingCallback callback;
+
+  private ApplicationAttemptId attemptId;
+
+  @Before
+  public void setup() {
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
+    callback = new CountingCallback();
+
+    attemptId =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
+
+    uam = new TestableUnmanagedApplicationManager(conf,
+        attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
+  }
+
+  protected void waitForCallBackCountAndCheckZeroPending(
+      CountingCallback callBack, int expectCallBackCount) {
+    synchronized (callBack) {
+      while (callBack.callBackCount != expectCallBackCount) {
+        try {
+          callBack.wait();
+        } catch (InterruptedException e) {
+        }
+      }
+      Assert.assertEquals(
+          "Non zero pending requests when number of allocate callbacks reaches "
+              + expectCallBackCount,
+          0, callBack.requestQueueSize);
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testBasicUsage()
+      throws YarnException, IOException, InterruptedException {
+
+    createAndRegisterApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+
+    // Wait for outstanding async allocate callback
+    waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+    finishApplicationMaster(
+        FinishApplicationMasterRequest.newInstance(null, null, null),
+        attemptId);
+  }
+
+  @Test(timeout = 5000)
+  public void testReRegister()
+      throws YarnException, IOException, InterruptedException {
+
+    createAndRegisterApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+
+    uam.setShouldReRegisterNext();
+
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+
+    // Wait for outstanding async allocate callback
+    waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+    uam.setShouldReRegisterNext();
+
+    finishApplicationMaster(
+        FinishApplicationMasterRequest.newInstance(null, null, null),
+        attemptId);
+  }
+
+  /**
+   * If register is slow, async allocate requests in the meanwhile should not
+   * throw or be dropped.
+   */
+  @Test(timeout = 5000)
+  public void testSlowRegisterCall()
+      throws YarnException, IOException, InterruptedException {
+
+    // Register with wait() in RM in a separate thread
+    Thread registerAMThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          createAndRegisterApplicationMaster(
+              RegisterApplicationMasterRequest.newInstance(null, 1001, null),
+              attemptId);
+        } catch (Exception e) {
+          LOG.info("Register thread exception", e);
+        }
+      }
+    });
+
+    // Sync obj from mock RM
+    Object syncObj = MockResourceManagerFacade.getSyncObj();
+
+    // Wait for register call in the thread get into RM and then wake us
+    synchronized (syncObj) {
+      LOG.info("Starting register thread");
+      registerAMThread.start();
+      try {
+        LOG.info("Test main starts waiting");
+        syncObj.wait();
+        LOG.info("Test main wait finished");
+      } catch (Exception e) {
+        LOG.info("Test main wait interrupted", e);
+      }
+    }
+
+    // First allocate before register succeeds
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+
+    // Notify the register thread
+    synchronized (syncObj) {
+      syncObj.notifyAll();
+    }
+
+    LOG.info("Test main wait for register thread to finish");
+    registerAMThread.join();
+    LOG.info("Register thread finished");
+
+    // Second allocate, normal case
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+
+    // Both allocate before should respond
+    waitForCallBackCountAndCheckZeroPending(callback, 2);
+
+    finishApplicationMaster(
+        FinishApplicationMasterRequest.newInstance(null, null, null),
+        attemptId);
+
+    // Allocates after finishAM should be ignored
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+
+    Assert.assertEquals(0, callback.requestQueueSize);
+
+    // A short wait just in case the allocates get executed
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertEquals(2, callback.callBackCount);
+  }
+
+  @Test(expected = Exception.class)
+  public void testAllocateWithoutRegister()
+      throws YarnException, IOException, InterruptedException {
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback,
+        attemptId);
+  }
+
+  @Test(expected = Exception.class)
+  public void testFinishWithoutRegister()
+      throws YarnException, IOException, InterruptedException {
+    finishApplicationMaster(
+        FinishApplicationMasterRequest.newInstance(null, null, null),
+        attemptId);
+  }
+
+  @Test
+  public void testForceKill()
+      throws YarnException, IOException, InterruptedException {
+    createAndRegisterApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+    uam.forceKillApplication();
+
+    try {
+      uam.forceKillApplication();
+      Assert.fail("Should fail because application is already killed");
+    } catch (YarnException t) {
+    }
+  }
+
+  protected UserGroupInformation getUGIWithToken(
+      ApplicationAttemptId appAttemptId) {
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return ugi;
+  }
+
+  protected RegisterApplicationMasterResponse
+      createAndRegisterApplicationMaster(
+          final RegisterApplicationMasterRequest request,
+          ApplicationAttemptId appAttemptId)
+          throws YarnException, IOException, InterruptedException {
+    return getUGIWithToken(appAttemptId).doAs(
+        new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+          @Override
+          public RegisterApplicationMasterResponse run()
+              throws YarnException, IOException {
+            RegisterApplicationMasterResponse response =
+                uam.createAndRegisterApplicationMaster(request);
+            return response;
+          }
+        });
+  }
+
+  protected void allocateAsync(final AllocateRequest request,
+      final AsyncCallback<AllocateResponse> callBack,
+      ApplicationAttemptId appAttemptId)
+      throws YarnException, IOException, InterruptedException {
+    getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws YarnException {
+        uam.allocateAsync(request, callBack);
+        return null;
+      }
+    });
+  }
+
+  protected FinishApplicationMasterResponse finishApplicationMaster(
+      final FinishApplicationMasterRequest request,
+      ApplicationAttemptId appAttemptId)
+      throws YarnException, IOException, InterruptedException {
+    return getUGIWithToken(appAttemptId)
+        .doAs(new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
+          @Override
+          public FinishApplicationMasterResponse run()
+              throws YarnException, IOException {
+            FinishApplicationMasterResponse response =
+                uam.finishApplicationMaster(request);
+            return response;
+          }
+        });
+  }
+
+  protected class CountingCallback implements AsyncCallback<AllocateResponse> {
+    private int callBackCount;
+    private int requestQueueSize;
+
+    @Override
+    public void callback(AllocateResponse response) {
+      synchronized (this) {
+        callBackCount++;
+        requestQueueSize = uam.getRequestQueueSize();
+        this.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Testable UnmanagedApplicationManager that talks to a mock RM.
+   */
+  public static class TestableUnmanagedApplicationManager
+      extends UnmanagedApplicationManager {
+
+    private MockResourceManagerFacade rmProxy;
+
+    public TestableUnmanagedApplicationManager(Configuration conf,
+        ApplicationId appId, String queueName, String submitter,
+        String appNameSuffix) {
+      super(conf, appId, queueName, submitter, appNameSuffix);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <T> T createRMProxy(final Class<T> protocol, Configuration config,
+        UserGroupInformation user, Token<AMRMTokenIdentifier> token) {
+      if (rmProxy == null) {
+        rmProxy = new MockResourceManagerFacade(config, 0);
+      }
+      return (T) rmProxy;
+    }
+
+    public void setShouldReRegisterNext() {
+      if (rmProxy != null) {
+        rmProxy.setShouldReRegisterNext();
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 22fc8f6..3ba4d20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,7 +134,8 @@ public final class DefaultRequestInterceptor extends
     }
     AllocateResponse allocateResponse = rmClient.allocate(request);
     if (allocateResponse.getAMRMToken() != null) {
-      updateAMRMToken(allocateResponse.getAMRMToken());
+      YarnServerSecurityUtils.updateAMRMToken(allocateResponse.getAMRMToken(),
+          this.user, getConf());
     }
 
     return allocateResponse;
@@ -170,7 +171,9 @@ public final class DefaultRequestInterceptor extends
           ((DistributedSchedulingAMProtocol)rmClient)
               .allocateForDistributedScheduling(request);
       if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
-        updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+        YarnServerSecurityUtils.updateAMRMToken(
+            allocateResponse.getAllocateResponse().getAMRMToken(), this.user,
+            getConf());
       }
       return allocateResponse;
     } else {
@@ -195,18 +198,6 @@ public final class DefaultRequestInterceptor extends
             + "Check if the interceptor pipeline configuration is correct");
   }
 
-  private void updateAMRMToken(Token token) throws IOException {
-    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
-        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
-            token.getIdentifier().array(), token.getPassword().array(),
-            new Text(token.getKind()), new Text(token.getService()));
-    // Preserve the token service sent by the RM when adding the token
-    // to ensure we replace the previous token setup by the RM.
-    // Afterwards we can update the service address for the RPC layer.
-    user.addToken(amrmToken);
-    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
-  }
-
   @VisibleForTesting
   public void setRMClient(final ApplicationMasterProtocol rmClient) {
     if (rmClient instanceof DistributedSchedulingAMProtocol) {
@@ -257,19 +248,12 @@ public final class DefaultRequestInterceptor extends
     for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
         .getCurrentUser().getTokens()) {
       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-        token.setService(getAMRMTokenService(conf));
+        token.setService(ClientRMProxy.getAMRMTokenService(conf));
       }
     }
   }
 
   @InterfaceStability.Unstable
-  public static Text getAMRMTokenService(Configuration conf) {
-    return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-  }
-
-  @InterfaceStability.Unstable
   public static Text getTokenService(Configuration conf, String address,
       String defaultAddr, int defaultPort) {
     if (HAUtil.isHAEnabled(conf)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 147ba34..aa4d620 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -213,15 +213,13 @@ public class ApplicationMasterService extends AbstractService implements
     synchronized (lock) {
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (hasApplicationMasterRegistered(applicationAttemptId)) {
-        String message =
-            "Application Master is already registered : "
-                + appID;
+        String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
         LOG.warn(message);
         RMAuditLogger.logFailure(
-            this.rmContext.getRMApps()
-                .get(appID).getUser(),
-            AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
-            appID, applicationAttemptId);
+          this.rmContext.getRMApps()
+            .get(appID).getUser(),
+          AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
+          appID, applicationAttemptId);
         throw new InvalidApplicationMasterRequestException(message);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 93ab8d0..1603c2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -346,9 +347,8 @@ public class TestApplicationMasterLauncher {
       am.registerAppAttempt(false);
       Assert.fail();
     } catch (Exception e) {
-      Assert.assertEquals("Application Master is already registered : "
-          + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
+      Assert.assertEquals(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE
+          + attempt.getAppAttemptId().getApplicationId(), e.getMessage());
     }
 
     // Simulate an AM that was disconnected and app attempt was removed


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org