You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2017/04/17 22:31:09 UTC

reef git commit: [REEF-1776] Create YARN proxy user to register REEF Driver in Unmanaged AM mode

Repository: reef
Updated Branches:
  refs/heads/master e6c23d53f -> c5322db90


[REEF-1776] Create YARN proxy user to register REEF Driver in Unmanaged AM mode

Summary of changes:
   * Create `YarnProxyUser` class to encapsulate the credentials copying and execution in the proxy context
   * Implement generic API for such user as `UserCredentials` interface to minimize the exposure at the REEF client API side
   * Add `YarnProxyUser` implementation to proper configuration modules
   * Pass `UserCredentials` from REEF into the Driver context in `ReefOnReefDriver`
   * Minor refactoring and logging improvements

JIRA:
  [REEF-1776](https://issues.apache.org/jira/browse/REEF-1776)

Pull request:
  That closes #1290


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c5322db9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c5322db9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c5322db9

Branch: refs/heads/master
Commit: c5322db90db7430455c70623b6cfe063a2ae6603
Parents: e6c23d5
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Tue Apr 4 19:46:50 2017 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Mon Apr 17 14:53:04 2017 -0700

----------------------------------------------------------------------
 .../bridge/client/YarnJobSubmissionClient.java  |   7 +-
 .../org/apache/reef/client/DriverLauncher.java  |   9 +-
 .../reef/runtime/common/REEFEnvironment.java    |  27 +++-
 .../reef/runtime/common/UserCredentials.java    |  64 +++++++++
 .../client/defaults/DefaultUserCredentials.java |  75 ++++++++++
 .../examples/reefonreef/ReefOnReefDriver.java   |   6 +-
 .../yarn/client/YarnClientConfiguration.java    |   7 +-
 .../yarn/client/YarnJobSubmissionHandler.java   |   8 +-
 .../yarn/client/YarnSubmissionHelper.java       |   9 +-
 .../UnmanagedAmYarnClientConfiguration.java     |   4 +-
 .../UnmanagedAmYarnDriverConfiguration.java     |   2 +
 .../UnmanagedAmYarnJobSubmissionHandler.java    |   3 +-
 .../UnmanagedAmYarnSubmissionHelper.java        |   8 +-
 .../yarn/client/unmanaged/YarnProxyUser.java    | 139 +++++++++++++++++++
 .../yarn/driver/YarnContainerManager.java       |  35 +++--
 15 files changed, 381 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index fde89cb..6455f75 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -37,6 +37,7 @@ import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefi
 import org.apache.reef.runtime.yarn.YarnClasspathProvider;
 import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
 import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
+import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
@@ -71,6 +72,7 @@ public final class YarnJobSubmissionClient {
   private final REEFFileNames fileNames;
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
+  private final YarnProxyUser yarnProxyUser;
   private final SecurityTokenProvider tokenProvider;
   private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
 
@@ -81,6 +83,7 @@ public final class YarnJobSubmissionClient {
                           final YarnConfiguration yarnConfiguration,
                           final REEFFileNames fileNames,
                           final ClasspathProvider classpath,
+                          final YarnProxyUser yarnProxyUser,
                           final SecurityTokenProvider tokenProvider,
                           final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
 
@@ -90,6 +93,7 @@ public final class YarnJobSubmissionClient {
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
+    this.yarnProxyUser = yarnProxyUser;
     this.tokenProvider = tokenProvider;
     this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
   }
@@ -115,7 +119,8 @@ public final class YarnJobSubmissionClient {
     // ------------------------------------------------------------------------
     // Get an application ID
     try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
-        yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, commandPrefixList)) {
+        this.yarnConfiguration, this.fileNames, this.classpath, this.yarnProxyUser,
+        this.tokenProvider, this.isUnmanaged, this.commandPrefixList)) {
 
       // ------------------------------------------------------------------------
       // Prepare the JAR

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
index 78ca13a..bc8d95f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java
@@ -21,6 +21,7 @@ package org.apache.reef.client;
 import org.apache.reef.annotations.Provided;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.common.UserCredentials;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Unit;
@@ -59,6 +60,7 @@ public final class DriverLauncher implements AutoCloseable {
       .build();
 
   private final REEF reef;
+  private final UserCredentials user;
 
   private LauncherStatus status = LauncherStatus.INIT;
 
@@ -66,8 +68,13 @@ public final class DriverLauncher implements AutoCloseable {
   private RunningJob theJob;
 
   @Inject
-  private DriverLauncher(final REEF reef) {
+  private DriverLauncher(final REEF reef, final UserCredentials user) {
     this.reef = reef;
+    this.user = user;
+  }
+
+  public UserCredentials getUser() {
+    return this.user;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
index 3d3ae67..3a1429a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.reef.wake.profiler.WakeProfiler;
 import org.apache.reef.wake.profiler.ProfilerState;
 import org.apache.reef.wake.time.Clock;
 
+import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -64,8 +65,22 @@ public final class REEFEnvironment implements Runnable, AutoCloseable {
    * Main part of the configuration is usually read from config file by REEFLauncher.
    * @throws InjectionException Thrown on configuration error.
    */
-  @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
   public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException {
+    return fromConfiguration(null, configurations);
+  }
+
+  /**
+   * Create a new REEF environment.
+   * @param hostUser User credentials to use when registering REEF app with the Resource Manager.
+   * This parameter may be required for Unmanaged AM mode. Can be null.
+   * @param configurations REEF component (Driver or Evaluator) configuration.
+   * If multiple configurations are provided, they will be merged before use.
+   * Main part of the configuration is usually read from config file by REEFLauncher.
+   * @throws InjectionException Thrown on configuration error.
+   */
+  @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler
+  public static REEFEnvironment fromConfiguration(
+      final UserCredentials hostUser, final Configuration... configurations) throws InjectionException {
 
     final Configuration config = Configurations.merge(configurations);
 
@@ -86,6 +101,16 @@ public final class REEFEnvironment implements Runnable, AutoCloseable {
     final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class);
     final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class);
 
+    if (hostUser != null) {
+      try {
+        injector.getInstance(UserCredentials.class).set("reef-proxy", hostUser);
+      } catch (final IOException ex) {
+        final String msg = "Cannot copy user credentials: " + hostUser;
+        LOG.log(Level.SEVERE, msg, ex);
+        throw new RuntimeException(msg, ex);
+      }
+    }
+
     try {
 
       final Clock clock = injector.getInstance(Clock.class);

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java
new file mode 100644
index 0000000..6bac1ef
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.common.client.defaults.DefaultUserCredentials;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A holder object for REEF user credentials.
+ * Implementations of this interface are used e.g. for Unmanaged AM applications on YARN.
+ */
+@Public
+@ClientSide
+@DriverSide
+@DefaultImplementation(DefaultUserCredentials.class)
+public interface UserCredentials {
+
+  /**
+   * Copy credentials from another existing user.
+   * This method can be called only once per instance.
+   * @param name name of the new user.
+   * @param other Credentials of another user.
+   * @throws IOException if unable to copy.
+   */
+  void set(final String name, final UserCredentials other) throws IOException;
+
+  /**
+   * Check if the user credentials had been set.
+   * @return true if set() method had been called successfully before, false otherwise.
+   */
+  boolean isSet();
+
+  /**
+   * Execute the privileged action as a given user.
+   * If user credentials are not set, execute the action outside the user context.
+   * @param action an action to run.
+   * @param <T> action return type.
+   * @return result of an action.
+   * @throws Exception whatever the action can throw.
+   */
+  <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java
new file mode 100644
index 0000000..216ce97
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client.defaults;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.UserCredentials;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A holder object for REEF user credentials.
+ */
+@Private
+@ClientSide
+@DriverSide
+public final class DefaultUserCredentials implements UserCredentials {
+
+  @Inject
+  private DefaultUserCredentials() { }
+
+  /**
+   * Copy credentials from another existing user.
+   * This method can be called only once per instance.
+   * This default implementation should never be called.
+   * @param name Name of the new user.
+   * @param other Credentials of another user.
+   * @throws RuntimeException always throws.
+   */
+  @Override
+  public void set(final String name, final UserCredentials other) throws IOException {
+    throw new RuntimeException("Not implemented! Attempt to set user " + name + " from: " + other);
+  }
+
+  /**
+   * Check if the user credentials had been set. Always returns false.
+   * @return always false.
+   */
+  @Override
+  public boolean isSet() {
+    return false;
+  }
+
+  /**
+   * Execute the privileged action as a given user.
+   * This implementation always executes the action outside the user context, simply by calling action.run().
+   * @param action an action to run.
+   * @param <T> action return type.
+   * @return result of an action.
+   * @throws Exception whatever the action can throw.
+   */
+  @Override
+  public <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception {
+    return action.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java
index ae04436..a509399 100644
--- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java
@@ -85,9 +85,13 @@ final class ReefOnReefDriver implements EventHandler<StartTime> {
           .set(UnmanagedAmYarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, DRIVER_ROOT_PATH)
           .build();
 
-      try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(yarnAmConfig, DRIVER_CONFIG)) {
+      try (final REEFEnvironment reef =
+          REEFEnvironment.fromConfiguration(client.getUser(), yarnAmConfig, DRIVER_CONFIG)) {
+
         reef.run();
+
         final ReefServiceProtos.JobStatusProto status = reef.getLastStatus();
+
         LOG.log(Level.INFO, "REEF-on-REEF inner job {0} completed: state {1}",
             new Object[] {innerApplicationId, status.getState()});
       }

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
index 0e6733b..b1fe4f9 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.runtime.common.UserCredentials;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
 import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
@@ -30,6 +31,7 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.yarn.YarnClasspathProvider;
 import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
 import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
 import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
 import org.apache.reef.tang.ConfigurationProvider;
 import org.apache.reef.tang.formats.*;
@@ -58,15 +60,16 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder {
 
   public static final ConfigurationModule CONF = new YarnClientConfiguration()
           .merge(CommonRuntimeConfiguration.CONF)
-          // Bind YARN
+          // Bind YARN-specific classes
           .bindImplementation(JobSubmissionHandler.class, YarnJobSubmissionHandler.class)
           .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class)
+          .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+          .bindImplementation(UserCredentials.class, YarnProxyUser.class)
           // Bind the parameters given by the user
           .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
           .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
           .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
           .bindNamedParameter(DriverIsUnmanaged.class, UNMANAGED_DRIVER)
-          .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
           // Bind external constructors. Taken from  YarnExternalConstructors.registerClientConstructors
           .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
           .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS)

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index 9457f90..c79c668 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -34,6 +34,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.tang.Configuration;
@@ -61,6 +62,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
   private final JobUploader uploader;
+  private final YarnProxyUser yarnProxyUser;
   private final SecurityTokenProvider tokenProvider;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
@@ -75,6 +77,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
           final REEFFileNames fileNames,
           final ClasspathProvider classpath,
           final JobUploader uploader,
+          final YarnProxyUser yarnProxyUser,
           final SecurityTokenProvider tokenProvider,
           final DriverConfigurationProvider driverConfigurationProvider) throws IOException {
 
@@ -85,6 +88,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
     this.fileNames = fileNames;
     this.classpath = classpath;
     this.uploader = uploader;
+    this.yarnProxyUser = yarnProxyUser;
     this.tokenProvider = tokenProvider;
     this.driverConfigurationProvider = driverConfigurationProvider;
   }
@@ -100,8 +104,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
     LOG.log(Level.FINEST, "Submitting{0} job: {1}",
         new Object[] {this.isUnmanaged ? " UNMANAGED AM" : "", jobSubmissionEvent});
 
-    try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
-        this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) {
+    try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(this.yarnConfiguration,
+        this.fileNames, this.classpath, this.yarnProxyUser, this.tokenProvider, this.isUnmanaged)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
       final Optional<String> userBoundJobSubmissionDirectory =

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 6df3ffc..019114d 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -19,6 +19,7 @@
 package org.apache.reef.runtime.yarn.client;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
@@ -32,6 +33,7 @@ import org.apache.reef.runtime.common.REEFLauncher;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
 import org.apache.reef.runtime.yarn.util.YarnTypes;
 
 import java.io.IOException;
@@ -52,6 +54,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
   private final ApplicationId applicationId;
   private final Map<String, LocalResource> resources = new HashMap<>();
   private final ClasspathProvider classpath;
+  private final YarnProxyUser yarnProxyUser;
   private final SecurityTokenProvider tokenProvider;
   private final boolean isUnmanaged;
   private final List<String> commandPrefixList;
@@ -64,11 +67,13 @@ public final class YarnSubmissionHelper implements AutoCloseable {
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
                               final ClasspathProvider classpath,
+                              final YarnProxyUser yarnProxyUser,
                               final SecurityTokenProvider tokenProvider,
                               final boolean isUnmanaged,
                               final List<String> commandPrefixList) throws IOException, YarnException {
 
     this.classpath = classpath;
+    this.yarnProxyUser = yarnProxyUser;
     this.isUnmanaged = isUnmanaged;
 
     this.driverStdoutFilePath =
@@ -98,9 +103,10 @@ public final class YarnSubmissionHelper implements AutoCloseable {
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
                               final ClasspathProvider classpath,
+                              final YarnProxyUser yarnProxyUser,
                               final SecurityTokenProvider tokenProvider,
                               final boolean isUnmanaged) throws IOException, YarnException {
-    this(yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, null);
+    this(yarnConfiguration, fileNames, classpath, yarnProxyUser, tokenProvider, isUnmanaged, null);
   }
 
   /**
@@ -287,6 +293,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
       // For Unmanaged AM mode, add a new app token to the
       // current process so it can talk to the RM as an AM.
       final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
+      this.yarnProxyUser.set("reef-proxy", UserGroupInformation.getCurrentUser(), token);
       this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
index 7862ad9..bb32120 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.runtime.common.UserCredentials;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
 import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
@@ -60,11 +61,12 @@ public class UnmanagedAmYarnClientConfiguration extends ConfigurationModuleBuild
       // Bind YARN
       .bindImplementation(JobSubmissionHandler.class, UnmanagedAmYarnJobSubmissionHandler.class)
       .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class)
+      .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+      .bindImplementation(UserCredentials.class, YarnProxyUser.class)
       // Bind the parameters given by the user
       .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
       .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
       .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
-      .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
       // Bind external constructors. Taken from  YarnExternalConstructors.registerClientConstructors
       .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
       .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS)

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
index 2a57a97..c497d17 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.driver.parameters.DriverIsUnmanaged;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.UserCredentials;
 import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
@@ -85,6 +86,7 @@ public final class UnmanagedAmYarnDriverConfiguration extends ConfigurationModul
           .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
           .bindImplementation(RackNameFormatter.class, RACK_NAME_FORMATTER)
           .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+          .bindImplementation(UserCredentials.class, YarnProxyUser.class)
           .bindNamedParameter(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME)
           .build();
 

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
index d75a2b5..73e2cd1 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
@@ -52,13 +52,14 @@ final class UnmanagedAmYarnJobSubmissionHandler implements JobSubmissionHandler
       @Parameter(JobQueue.class) final String defaultQueueName,
       final UnmanagedDriverFiles driverFiles,
       final YarnConfiguration yarnConfiguration,
+      final YarnProxyUser yarnProxyUser,
       final SecurityTokenProvider tokenProvider) throws IOException {
 
     this.defaultQueueName = defaultQueueName;
     this.driverFiles = driverFiles;
 
     try {
-      this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, tokenProvider);
+      this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, yarnProxyUser, tokenProvider);
     } catch (final IOException | YarnException ex) {
       LOG.log(Level.SEVERE, "Cannot create YARN client", ex);
       throw new RuntimeException("Cannot create YARN client", ex);

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
index d25dbad..9142194 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.client.unmanaged;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -42,14 +43,18 @@ final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable {
   private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnSubmissionHelper.class.getName());
 
   private final SecurityTokenProvider tokenProvider;
+  private final YarnProxyUser yarnProxyUser;
   private final YarnClient yarnClient;
   private final ApplicationSubmissionContext applicationSubmissionContext;
   private final ApplicationId applicationId;
 
-  UnmanagedAmYarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
+  UnmanagedAmYarnSubmissionHelper(
+      final YarnConfiguration yarnConfiguration,
+      final YarnProxyUser yarnProxyUser,
       final SecurityTokenProvider tokenProvider) throws IOException, YarnException {
 
     this.tokenProvider = tokenProvider;
+    this.yarnProxyUser = yarnProxyUser;
 
     LOG.log(Level.FINE, "Initializing YARN Client");
     this.yarnClient = YarnClient.createYarnClient();
@@ -116,6 +121,7 @@ final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable {
     this.yarnClient.submitApplication(this.applicationSubmissionContext);
 
     final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
+    this.yarnProxyUser.set("reef-uam-proxy", UserGroupInformation.getCurrentUser(), token);
     this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java
new file mode 100644
index 0000000..1f653e2
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.UserCredentials;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A holder class for the proxy UserGroupInformation object
+ * required for Unmanaged YARN Application Master in REEF-on-REEF or REEF-on-Spark mode.
+ */
+@Private
+@ClientSide
+@DriverSide
+public final class YarnProxyUser implements UserCredentials {
+
+  private static final Logger LOG = Logger.getLogger(YarnProxyUser.class.getName());
+
+  private UserGroupInformation proxyUGI = null;
+
+  @Inject
+  private YarnProxyUser() { }
+
+  /**
+   * Get the YARN proxy user information. If not set, return the (global) current user.
+   * @return Proxy user group information, if set; otherwise, return current YARN user.
+   * @throws IOException if proxy user is not set AND unable to obtain current YARN user information.
+   */
+  public UserGroupInformation get() throws IOException {
+
+    final UserGroupInformation effectiveUGI =
+        this.proxyUGI == null ? UserGroupInformation.getCurrentUser() : this.proxyUGI;
+
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "UGI: get: {0}", ugiToString("EFFECTIVE", effectiveUGI));
+    }
+
+    return effectiveUGI;
+  }
+
+  /**
+   * Check if the proxy user is set.
+   * @return true if proxy user set, false otherwise.
+   */
+  @Override
+  public boolean isSet() {
+    return this.proxyUGI != null;
+  }
+
+  /**
+   * Set YARN user. This method can be called only once per class instance.
+   * @param name Name of the new proxy user.
+   * @param hostUser User credentials to copy. Must be an instance of YarnProxyUser.
+   */
+  @Override
+  public void set(final String name, final UserCredentials hostUser) throws IOException {
+
+    assert this.proxyUGI == null;
+    assert hostUser instanceof YarnProxyUser;
+
+    LOG.log(Level.FINE, "UGI: user {0} copy from: {1}", new Object[] {name, hostUser});
+
+    final UserGroupInformation hostUGI = ((YarnProxyUser) hostUser).get();
+    final Collection<Token<? extends TokenIdentifier>> tokens = hostUGI.getCredentials().getAllTokens();
+
+    this.set(name, hostUGI, tokens.toArray(new Token[tokens.size()]));
+  }
+
+  /**
+   * Create YARN proxy user and add security tokens to its credentials.
+   * This method can be called only once per class instance.
+   * @param proxyName Name of the new proxy user.
+   * @param hostUGI YARN user to impersonate the proxy.
+   * @param tokens Security tokens to add to the new proxy user's credentials.
+   */
+  @SafeVarargs
+  public final void set(final String proxyName,
+      final UserGroupInformation hostUGI, final Token<? extends TokenIdentifier>... tokens) {
+
+    assert this.proxyUGI == null;
+    this.proxyUGI = UserGroupInformation.createProxyUser(proxyName, hostUGI);
+
+    for (final Token<? extends TokenIdentifier> token : tokens) {
+      this.proxyUGI.addToken(token);
+    }
+
+    LOG.log(Level.FINE, "UGI: user {0} set to: {1}", new Object[] {proxyName, this});
+  }
+
+  /**
+   * Execute the privileged action as a given user.
+   * If user credentials are not set, execute the action outside the user context.
+   * @param action an action to run.
+   * @param <T> action return type.
+   * @return result of an action.
+   * @throws Exception whatever the action can throw.
+   */
+  public <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception {
+    LOG.log(Level.FINE, "{0} execute {1}", new Object[] {this, action});
+    return this.proxyUGI == null ? action.run() : this.proxyUGI.doAs(action);
+  }
+
+  @Override
+  public String toString() {
+    return this.proxyUGI == null ? "UGI: { CURRENT user: null }" : ugiToString("PROXY", this.proxyUGI);
+  }
+
+  private static String ugiToString(final String prefix, final UserGroupInformation ugi) {
+    return String.format("UGI: { %s user: %s tokens: %s }", prefix, ugi, ugi.getCredentials().getAllTokens());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index f323018..58df83a 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.yarn.driver;
 import com.google.protobuf.ByteString;
 import org.apache.commons.collections.ListUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.*;
@@ -29,7 +30,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.ProgressProvider;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
@@ -39,6 +42,7 @@ import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
 import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
 import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
 import org.apache.reef.tang.InjectionFuture;
@@ -49,14 +53,16 @@ import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 import javax.inject.Inject;
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-final class YarnContainerManager
-    implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
+@Private
+@DriverSide
+final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
 
   private static final Logger LOG = Logger.getLogger(YarnContainerManager.class.getName());
 
@@ -74,6 +80,7 @@ final class YarnContainerManager
 
   private final YarnConfiguration yarnConf;
   private final AMRMClientAsync<AMRMClient.ContainerRequest> resourceManager;
+  private final YarnProxyUser yarnProxyUser;
   private final NMClientAsync nodeManager;
   private final REEFEventHandlers reefEventHandlers;
   private final Containers containers;
@@ -92,6 +99,7 @@ final class YarnContainerManager
       @Parameter(YarnHeartbeatPeriod.class) final int yarnRMHeartbeatPeriod,
       @Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory,
       final YarnConfiguration yarnConf,
+      final YarnProxyUser yarnProxyUser,
       final REEFEventHandlers reefEventHandlers,
       final Containers containers,
       final ApplicationMasterRegistration registration,
@@ -109,6 +117,7 @@ final class YarnContainerManager
     this.registration = registration;
     this.containerRequestCounter = containerRequestCounter;
     this.yarnConf = yarnConf;
+    this.yarnProxyUser = yarnProxyUser;
     this.rackNameFormatter = rackNameFormatter;
     this.trackingUrl = trackingURLProvider.getTrackingUrl();
 
@@ -119,7 +128,8 @@ final class YarnContainerManager
     this.reefFileNames = reefFileNames;
     this.progressProvider = progressProvider;
 
-    LOG.log(Level.FINEST, "Instantiated YarnContainerManager: {0}", this.registration);
+    LOG.log(Level.FINEST, "Instantiated YarnContainerManager: {0} {1}",
+        new Object[] {this.registration, this.yarnProxyUser});
   }
 
   /**
@@ -308,16 +318,21 @@ final class YarnContainerManager
 
     LOG.log(Level.FINEST, "YARN registration: begin");
 
-    this.resourceManager.init(this.yarnConf);
-    this.resourceManager.start();
-
     this.nodeManager.init(this.yarnConf);
     this.nodeManager.start();
 
-    LOG.log(Level.FINEST, "YARN registration: registered with RM and NM");
-
     try {
 
+      this.yarnProxyUser.doAs(
+          new PrivilegedExceptionAction<Object>() {
+            @Override
+            public Object run() throws Exception {
+              resourceManager.init(yarnConf);
+              resourceManager.start();
+              return null;
+            }
+          });
+
       LOG.log(Level.FINE, "YARN registration: register AM at \"{0}:{1}\" tracking URL \"{2}\"",
           new Object[] {AM_REGISTRATION_HOST, AM_REGISTRATION_PORT, this.trackingUrl});
 
@@ -333,7 +348,7 @@ final class YarnContainerManager
         out.writeBytes(this.trackingUrl + '\n');
       }
 
-    } catch (final YarnException | IOException e) {
+    } catch (final Exception e) {
       LOG.log(Level.WARNING, "Unable to register application master.", e);
       onRuntimeError(e);
     }