You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/05/15 01:29:02 UTC

[1/2] hadoop git commit: Fixing MR intermediate spills. Contributed by Arun Suresh.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 53fe4eff0 -> 9a2a9553e


Fixing MR intermediate spills. Contributed by Arun Suresh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b710a42
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b710a42
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b710a42

Branch: refs/heads/trunk
Commit: 6b710a42e00acca405e085724c89cda016cf7442
Parents: 53fe4ef
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu May 14 16:07:56 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu May 14 16:07:56 2015 -0700

----------------------------------------------------------------------
 .../hadoop/mapred/LocalContainerLauncher.java   | 10 +++++++
 .../hadoop/mapred/TaskAttemptListenerImpl.java  | 17 +++++++++--
 .../org/apache/hadoop/mapred/YarnChild.java     | 18 ++++++++++++
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    | 24 +++++++++++++++-
 .../java/org/apache/hadoop/mapred/Task.java     | 25 ++++++++++++++++
 .../apache/hadoop/mapreduce/CryptoUtils.java    | 17 ++++++-----
 .../apache/hadoop/mapreduce/JobSubmitter.java   | 16 ++++-------
 .../hadoop/mapreduce/security/TokenCache.java   | 10 +++++++
 .../mapreduce/task/reduce/LocalFetcher.java     |  6 ++--
 .../src/site/markdown/EncryptedShuffle.md       |  8 ++++++
 .../mapreduce/task/reduce/TestMerger.java       |  2 +-
 .../TestMRIntermediateDataEncryption.java       | 30 ++++++++++++++------
 .../apache/hadoop/mapred/TestMapProgress.java   | 14 +++++----
 13 files changed, 156 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 52b3497..9d8b4a5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -83,6 +83,7 @@ public class LocalContainerLauncher extends AbstractService implements
   private final ClassLoader jobClassLoader;
   private ExecutorService taskRunner;
   private Thread eventHandler;
+  private byte[] encryptedSpillKey = new byte[] {0};
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
 
@@ -176,6 +177,11 @@ public class LocalContainerLauncher extends AbstractService implements
     }
   }
 
+  public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
+    if (encryptedSpillKey != null) {
+      this.encryptedSpillKey = encryptedSpillKey;
+    }
+  }
 
   /*
    * Uber-AM lifecycle/ordering ("normal" case):
@@ -382,6 +388,10 @@ public class LocalContainerLauncher extends AbstractService implements
         // map to handle)
         conf.setBoolean("mapreduce.task.uberized", true);
 
+        // Check and handle Encrypted spill key
+        task.setEncryptedSpillKey(encryptedSpillKey);
+        YarnChild.setEncryptedSpillKeyIfRequired(task);
+
         // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
         // etc.), or just assume/hope the state machine(s) and uber-AM work
         // as expected?

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index c8f2427..49a00c5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -84,20 +84,30 @@ public class TaskAttemptListenerImpl extends CompositeService
     jvmIDToActiveAttemptMap
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
   private Set<WrappedJvmID> launchedJVMs = Collections
-      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
-  
+      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
+
   private JobTokenSecretManager jobTokenSecretManager = null;
   private AMPreemptionPolicy preemptionPolicy;
-  
+  private byte[] encryptedSpillKey;
+
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager,
       RMHeartbeatHandler rmHeartbeatHandler,
       AMPreemptionPolicy preemptionPolicy) {
+    this(context, jobTokenSecretManager, rmHeartbeatHandler,
+            preemptionPolicy, null);
+  }
+
+  public TaskAttemptListenerImpl(AppContext context,
+      JobTokenSecretManager jobTokenSecretManager,
+      RMHeartbeatHandler rmHeartbeatHandler,
+      AMPreemptionPolicy preemptionPolicy, byte[] secretShuffleKey) {
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.preemptionPolicy = preemptionPolicy;
+    this.encryptedSpillKey = secretShuffleKey;
   }
 
   @Override
@@ -484,6 +494,7 @@ public class TaskAttemptListenerImpl extends CompositeService
             jvmIDToActiveAttemptMap.remove(wJvmID);
         launchedJVMs.remove(wJvmID);
         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+        task.setEncryptedSpillKey(encryptedSpillKey);
         jvmTask = new JvmTask(task, false);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index 28c0a5b..ea9733c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -165,6 +165,7 @@ class YarnChild {
         @Override
         public Object run() throws Exception {
           // use job-specified working directory
+          setEncryptedSpillKeyIfRequired(taskFinal);
           FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
           taskFinal.run(job, umbilical); // run the task
           return null;
@@ -224,6 +225,23 @@ class YarnChild {
   }
 
   /**
+   * Utility method to check if the Encrypted Spill Key needs to be set into the
+   * user credentials of the user running the Map / Reduce Task
+   * @param task The Map / Reduce task to set the Encrypted Spill information in
+   * @throws Exception
+   */
+  public static void setEncryptedSpillKeyIfRequired(Task task) throws
+          Exception {
+    if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
+            .getEncryptedSpillKey().length > 1)) {
+      Credentials creds =
+              UserGroupInformation.getCurrentUser().getCredentials();
+      TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
+      UserGroupInformation.getCurrentUser().addCredentials(creds);
+    }
+  }
+
+  /**
    * Configure mapred-local dirs. This config is used by the task for finding
    * out an output directory.
    * @throws IOException 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 95baaa3..752b30c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -148,6 +150,8 @@ import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import javax.crypto.KeyGenerator;
+
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -175,6 +179,7 @@ public class MRAppMaster extends CompositeService {
    * Priority of the MRAppMaster shutdown hook.
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
 
   private Clock clock;
   private final long startTime;
@@ -206,6 +211,7 @@ public class MRAppMaster extends CompositeService {
   private JobHistoryEventHandler jobHistoryEventHandler;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
   private AMPreemptionPolicy preemptionPolicy;
+  private byte[] encryptedSpillKey;
 
   // After a task attempt completes from TaskUmbilicalProtocol's point of view,
   // it will be transitioned to finishing state.
@@ -704,8 +710,22 @@ public class MRAppMaster extends CompositeService {
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
       this.jobCredentials = ((JobConf)conf).getCredentials();
+      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
+        int keyLen = conf.getInt(
+                MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
+                MRJobConfig
+                        .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
+        KeyGenerator keyGen =
+                KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
+        keyGen.init(keyLen);
+        encryptedSpillKey = keyGen.generateKey().getEncoded();
+      } else {
+        encryptedSpillKey = new byte[] {0};
+      }
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new YarnRuntimeException(e);
     }
   }
 
@@ -762,7 +782,7 @@ public class MRAppMaster extends CompositeService {
       AMPreemptionPolicy preemptionPolicy) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
-            getRMHeartbeatHandler(), preemptionPolicy);
+            getRMHeartbeatHandler(), preemptionPolicy, encryptedSpillKey);
     return lis;
   }
 
@@ -929,6 +949,8 @@ public class MRAppMaster extends CompositeService {
       if (job.isUber()) {
         this.containerLauncher = new LocalContainerLauncher(context,
             (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
+        ((LocalContainerLauncher) this.containerLauncher)
+                .setEncryptedSpillKey(encryptedSpillKey);
       } else {
         this.containerLauncher = new ContainerLauncherImpl(context);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 80881bc..c07d517 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -149,6 +149,8 @@ abstract public class Task implements Writable, Configurable {
   private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
+  private byte[] encryptedSpillKey = new byte[] {0};  // Key Used to encrypt
+  // intermediate spills
   TaskStatus taskStatus;                          // current status of the task
   protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
@@ -263,6 +265,24 @@ abstract public class Task implements Writable, Configurable {
   }
 
   /**
+   * Get Encrypted spill key
+   * @return encrypted spill key
+   */
+  public byte[] getEncryptedSpillKey() {
+    return encryptedSpillKey;
+  }
+
+  /**
+   * Set Encrypted spill key
+   * @param encryptedSpillKey key
+   */
+  public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
+    if (encryptedSpillKey != null) {
+      this.encryptedSpillKey = encryptedSpillKey;
+    }
+  }
+
+  /**
    * Get the job token secret
    * @return the token secret
    */
@@ -492,6 +512,8 @@ abstract public class Task implements Writable, Configurable {
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);
     Text.writeString(out, user);
+    out.writeInt(encryptedSpillKey.length);
+    out.write(encryptedSpillKey);
     extraData.write(out);
   }
   
@@ -517,6 +539,9 @@ abstract public class Task implements Writable, Configurable {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
     user = StringInterner.weakIntern(Text.readString(in));
+    int len = in.readInt();
+    encryptedSpillKey = new byte[len];
+    in.readFully(encryptedSpillKey);
     extraData.readFields(in);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
index ef06176..c4130b1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.LimitInputStream;
@@ -50,7 +49,7 @@ public class CryptoUtils {
 
   private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
 
-  public static boolean isShuffleEncrypted(Configuration conf) {
+  public static boolean isEncryptedSpillEnabled(Configuration conf) {
     return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
         MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
   }
@@ -64,7 +63,7 @@ public class CryptoUtils {
    */
   public static byte[] createIV(Configuration conf) throws IOException {
     CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
       cryptoCodec.generateSecureRandom(iv);
       return iv;
@@ -75,13 +74,13 @@ public class CryptoUtils {
 
   public static int cryptoPadding(Configuration conf) {
     // Sizeof(IV) + long(start-offset)
-    return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf)
+    return isEncryptedSpillEnabled(conf) ? CryptoCodec.getInstance(conf)
         .getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
   }
 
   private static byte[] getEncryptionKey() throws IOException {
-    return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser()
-        .getCredentials());
+    return TokenCache.getEncryptedSpillKey(UserGroupInformation.getCurrentUser()
+            .getCredentials());
   }
 
   private static int getBufferSize(Configuration conf) {
@@ -102,7 +101,7 @@ public class CryptoUtils {
    */
   public static FSDataOutputStream wrapIfNecessary(Configuration conf,
       FSDataOutputStream out) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
       byte[] iv = createIV(conf);
       out.write(iv);
@@ -137,7 +136,7 @@ public class CryptoUtils {
    */
   public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
       long length) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       int bufferSize = getBufferSize(conf);
       if (length > -1) {
         in = new LimitInputStream(in, length);
@@ -174,7 +173,7 @@ public class CryptoUtils {
    */
   public static FSDataInputStream wrapIfNecessary(Configuration conf,
       FSDataInputStream in) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
       int bufferSize = getBufferSize(conf);
       // Not going to be used... but still has to be read...

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 023bd63..a458e2c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -18,12 +18,10 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
@@ -52,7 +49,6 @@ import org.apache.hadoop.mapred.QueueACL;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -176,13 +172,8 @@ class JobSubmitter {
       if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
         KeyGenerator keyGen;
         try {
-         
-          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
-              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
-                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
-              : SHUFFLE_KEY_LENGTH;
           keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
-          keyGen.init(keyLen);
+          keyGen.init(SHUFFLE_KEY_LENGTH);
         } catch (NoSuchAlgorithmException e) {
           throw new IOException("Error generating shuffle secret key", e);
         }
@@ -190,6 +181,11 @@ class JobSubmitter {
         TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
             job.getCredentials());
       }
+      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
+        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
+        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
+                "data spill is enabled");
+      }
 
       copyAndConfigureFiles(job, submitJobDir);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
index e66f770..78f6c16 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
@@ -176,6 +176,7 @@ public class TokenCache {
   public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
   private static final Text JOB_TOKEN = new Text("JobToken");
   private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+  private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey");
   
   /**
    * load job token from a file
@@ -244,6 +245,15 @@ public class TokenCache {
     return getSecretKey(credentials, SHUFFLE_TOKEN);
   }
 
+  @InterfaceAudience.Private
+  public static void setEncryptedSpillKey(byte[] key, Credentials credentials) {
+    credentials.addSecretKey(ENC_SPILL_KEY, key);
+  }
+
+  @InterfaceAudience.Private
+  public static byte[] getEncryptedSpillKey(Credentials credentials) {
+    return getSecretKey(credentials, ENC_SPILL_KEY);
+  }
   /**
    * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
    * instead, this method is included for compatibility against Hadoop-1

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 6794c99..de2382c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -127,6 +127,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     long compressedLength = ir.partLength;
     long decompressedLength = ir.rawLength;
 
+    compressedLength -= CryptoUtils.cryptoPadding(job);
+    decompressedLength -= CryptoUtils.cryptoPadding(job);
+
     // Get the location for the map output - either in-memory or on-disk
     MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
         id);
@@ -150,8 +153,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     inStream = CryptoUtils.wrapIfNecessary(job, inStream);
 
     try {
-      inStream.seek(ir.startOffset);
-
+      inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
       mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
     } finally {
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md
index 58fd52a..c23be7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md
@@ -253,3 +253,11 @@ You can do this on a per-job basis, or by means of a cluster-wide setting in the
 To set this property in NodeManager, set it in the `yarn-env.sh` file:
 
       YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all"
+
+Encrypted Intermediate Data Spill files
+---------------------------------------
+
+This capability allows encryption of the intermediate files generated during the merge and shuffle phases.
+It can be enabled by setting the `mapreduce.job.encrypted-intermediate-data` job property to `true`.
+
+**NOTE:** Currently, enabling encrypted intermediate data spills would restrict the number of attempts of the job to 1.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
index 6e3bedf..a6b1964 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
@@ -87,7 +87,7 @@ public class TestMerger {
     jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    TokenCache.setShuffleSecretKey(new byte[16], credentials);
+    TokenCache.setEncryptedSpillKey(new byte[16], credentials);
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
     testInMemoryAndOnDiskMerger();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
index ebc32ad..28b2295 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
@@ -52,24 +52,31 @@ public class TestMRIntermediateDataEncryption {
 
   @Test
   public void testSingleReducer() throws Exception {
-    doEncryptionTest(3, 1, 2);
+    doEncryptionTest(3, 1, 2, false);
+  }
+
+  @Test
+  public void testUberMode() throws Exception {
+    doEncryptionTest(3, 1, 2, true);
   }
 
   @Test
   public void testMultipleMapsPerNode() throws Exception {
-    doEncryptionTest(8, 1, 2);
+    doEncryptionTest(8, 1, 2, false);
   }
 
   @Test
   public void testMultipleReducers() throws Exception {
-    doEncryptionTest(2, 4, 2);
+    doEncryptionTest(2, 4, 2, false);
   }
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception {
-    doEncryptionTest(numMappers, numReducers, numNodes, 1000);
+  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
+                               boolean isUber) throws Exception {
+    doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
   }
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception {
+  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
+                               int numLines, boolean isUber) throws Exception {
     MiniDFSCluster dfsCluster = null;
     MiniMRClientCluster mrCluster = null;
     FileSystem fileSystem = null;
@@ -85,7 +92,8 @@ public class TestMRIntermediateDataEncryption {
       // Generate input.
       createInput(fileSystem, numMappers, numLines);
       // Run the test.
-      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines);
+      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
+              numMappers, numReducers, numLines, isUber);
     } finally {
       if (dfsCluster != null) {
         dfsCluster.shutdown();
@@ -111,8 +119,9 @@ public class TestMRIntermediateDataEncryption {
     }
   }
 
-  private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines)
-    throws Exception {
+  private void runMergeTest(JobConf job, FileSystem fileSystem, int
+          numMappers, int numReducers, int numLines, boolean isUber)
+          throws Exception {
     fileSystem.delete(OUTPUT, true);
     job.setJobName("Test");
     JobClient client = new JobClient(job);
@@ -133,6 +142,9 @@ public class TestMRIntermediateDataEncryption {
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
     job.setInt("mapred.test.num_lines", numLines);
+    if (isUber) {
+      job.setBoolean("mapreduce.job.ubertask.enable", true);
+    }
     job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     try {
       submittedJob = client.submitJob(job);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b710a42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
index 9268f60..db6348b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
@@ -119,12 +119,14 @@ public class TestMapProgress extends TestCase {
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskId);
-      buf.append(" making progress to ");
-      buf.append(taskStatus.getProgress());
-      String state = taskStatus.getStateString();
-      if (state != null) {
-        buf.append(" and state of ");
-        buf.append(state);
+      if (taskStatus != null) {
+        buf.append(" making progress to ");
+        buf.append(taskStatus.getProgress());
+        String state = taskStatus.getStateString();
+        if (state != null) {
+          buf.append(" and state of ");
+          buf.append(state);
+        }
       }
       LOG.info(buf.toString());
       // ignore phase


[2/2] hadoop git commit: Fixing HDFS state-store. Contributed by Arun Suresh.

Posted by vi...@apache.org.
Fixing HDFS state-store. Contributed by Arun Suresh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9a2a9553
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9a2a9553
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9a2a9553

Branch: refs/heads/trunk
Commit: 9a2a9553eee454ecd18120535d3e845f86fc3584
Parents: 6b710a4
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu May 14 16:13:51 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu May 14 16:13:51 2015 -0700

----------------------------------------------------------------------
 .../recovery/FileSystemRMStateStore.java        |  83 +++++++---
 .../recovery/RMStateStoreTestBase.java          |  16 +-
 .../recovery/TestFSRMStateStore.java            | 151 ++++++++++++++++---
 .../src/site/markdown/ResourceManagerHA.md      |   2 +-
 4 files changed, 207 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a2a9553/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 68d26bb..6920bb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -84,7 +86,8 @@ public class FileSystemRMStateStore extends RMStateStore {
   protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
       "AMRMTokenSecretManagerNode";
 
-  @VisibleForTesting
+  private static final String UNREADABLE_BY_SUPERUSER_XATTRIB =
+          "security.hdfs.unreadable.by.superuser";
   protected FileSystem fs;
   @VisibleForTesting
   protected Configuration fsConf;
@@ -97,6 +100,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   private Path dtSequenceNumberPath = null;
   private int fsNumRetries;
   private long fsRetryInterval;
+  private boolean isHDFS;
 
   @VisibleForTesting
   Path fsWorkingPath;
@@ -141,11 +145,17 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
 
     fs = fsWorkingPath.getFileSystem(fsConf);
+    isHDFS = fs.getScheme().toLowerCase().contains("hdfs");
     mkdirsWithRetries(rmDTSecretManagerRoot);
     mkdirsWithRetries(rmAppRoot);
     mkdirsWithRetries(amrmTokenSecretManagerRoot);
   }
 
+  @VisibleForTesting
+  void setIsHDFS(boolean isHDFS) {
+    this.isHDFS = isHDFS;
+  }
+
   @Override
   protected synchronized void closeInternal() throws Exception {
     closeWithRetries();
@@ -175,9 +185,9 @@ public class FileSystemRMStateStore extends RMStateStore {
     byte[] data =
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (existsWithRetries(versionNodePath)) {
-      updateFile(versionNodePath, data);
+      updateFile(versionNodePath, data, false);
     } else {
-      writeFileWithRetries(versionNodePath, data);
+      writeFileWithRetries(versionNodePath, data, false);
     }
   }
   
@@ -194,12 +204,12 @@ public class FileSystemRMStateStore extends RMStateStore {
       // increment epoch and store it
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      updateFile(epochNodePath, storeData);
+      updateFile(epochNodePath, storeData, false);
     } else {
       // initialize epoch file with 1 for the next time.
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      writeFileWithRetries(epochNodePath, storeData);
+      writeFileWithRetries(epochNodePath, storeData, false);
     }
     return currentEpoch;
   }
@@ -253,7 +263,9 @@ public class FileSystemRMStateStore extends RMStateStore {
             continue;
           }
           byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
-              childNodeStatus.getLen());
+                  childNodeStatus.getLen());
+          // Set attribute if not already set
+          setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
             // application
             if (LOG.isDebugEnabled()) {
@@ -326,7 +338,7 @@ public class FileSystemRMStateStore extends RMStateStore {
       assert newChildNodeStatus.isFile();
       String newChildNodeName = newChildNodeStatus.getPath().getName();
       String childNodeName = newChildNodeName.substring(
-          0, newChildNodeName.length() - ".new".length());
+              0, newChildNodeName.length() - ".new".length());
       Path childNodePath =
           new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
       replaceFile(newChildNodeStatus.getPath(), childNodePath);
@@ -394,7 +406,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFileWithRetries(nodeCreatePath, appStateData);
+      writeFileWithRetries(nodeCreatePath, appStateData, true);
     } catch (Exception e) {
       LOG.info("Error storing info for app: " + appId, e);
       throw e;
@@ -412,7 +424,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      updateFile(nodeCreatePath, appStateData);
+      updateFile(nodeCreatePath, appStateData, true);
     } catch (Exception e) {
       LOG.info("Error updating info for app: " + appId, e);
       throw e;
@@ -433,7 +445,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFileWithRetries(nodeCreatePath, attemptStateData);
+      writeFileWithRetries(nodeCreatePath, attemptStateData, true);
     } catch (Exception e) {
       LOG.info("Error storing info for attempt: " + appAttemptId, e);
       throw e;
@@ -454,7 +466,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      updateFile(nodeCreatePath, attemptStateData);
+      updateFile(nodeCreatePath, attemptStateData, true);
     } catch (Exception e) {
       LOG.info("Error updating info for attempt: " + appAttemptId, e);
       throw e;
@@ -483,7 +495,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   public synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier identifier) throws Exception {
     Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
-      DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+            DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
     LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
     deleteFileWithRetries(nodeCreatePath);
   }
@@ -505,10 +517,10 @@ public class FileSystemRMStateStore extends RMStateStore {
         new RMDelegationTokenIdentifierData(identifier, renewDate);
     if (isUpdate) {
       LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
-      updateFile(nodeCreatePath, identifierData.toByteArray());
+      updateFile(nodeCreatePath, identifierData.toByteArray(), true);
     } else {
       LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
-      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
+      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);
 
       // store sequence number
       Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@@ -539,7 +551,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try (DataOutputStream fsOut = new DataOutputStream(os)) {
       LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
       masterKey.write(fsOut);
-      writeFileWithRetries(nodeCreatePath, os.toByteArray());
+      writeFileWithRetries(nodeCreatePath, os.toByteArray(), true);
     }
   }
 
@@ -572,6 +584,16 @@ public class FileSystemRMStateStore extends RMStateStore {
     return getNodePath(root, appId.toString());
   }
 
+  @VisibleForTesting
+  protected Path getAppDir(ApplicationId appId) {
+    return getAppDir(rmAppRoot, appId);
+  }
+
+  @VisibleForTesting
+  protected Path getAppAttemptDir(ApplicationAttemptId appAttId) {
+    return getNodePath(getAppDir(appAttId.getApplicationId()), appAttId
+            .toString());
+  }
   // FileSystem related code
 
   private boolean checkAndRemovePartialRecordWithRetries(final Path record)
@@ -594,12 +616,13 @@ public class FileSystemRMStateStore extends RMStateStore {
     }.runWithRetries();
   }
 
-  private void writeFileWithRetries(final Path outputPath,final byte[] data)
-      throws Exception {
+  private void writeFileWithRetries(final Path outputPath, final byte[] data,
+                                    final boolean makeUnreadableByAdmin)
+          throws Exception {
     new FSAction<Void>() {
       @Override
       public Void run() throws Exception {
-        writeFile(outputPath, data);
+        writeFile(outputPath, data, makeUnreadableByAdmin);
         return null;
       }
     }.runWithRetries();
@@ -746,7 +769,8 @@ public class FileSystemRMStateStore extends RMStateStore {
    * data to .tmp file and then rename it. Here we are assuming that rename is
    * atomic for underlying file system.
    */
-  private void writeFile(Path outputPath, byte[] data) throws Exception {
+  protected void writeFile(Path outputPath, byte[] data, boolean
+          makeUnradableByAdmin) throws Exception {
     Path tempPath =
         new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
     FSDataOutputStream fsOut = null;
@@ -754,6 +778,9 @@ public class FileSystemRMStateStore extends RMStateStore {
     // final status.
     try {
       fsOut = fs.create(tempPath, true);
+      if (makeUnradableByAdmin) {
+        setUnreadableBySuperuserXattrib(tempPath);
+      }
       fsOut.write(data);
       fsOut.close();
       fsOut = null;
@@ -768,10 +795,11 @@ public class FileSystemRMStateStore extends RMStateStore {
    * data to .new file and then rename it. Here we are assuming that rename is
    * atomic for underlying file system.
    */
-  protected void updateFile(Path outputPath, byte[] data) throws Exception {
+  protected void updateFile(Path outputPath, byte[] data, boolean
+          makeUnradableByAdmin) throws Exception {
     Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
     // use writeFileWithRetries to make sure .new file is created atomically
-    writeFileWithRetries(newPath, data);
+    writeFileWithRetries(newPath, data, makeUnradableByAdmin);
     replaceFile(newPath, outputPath);
   }
 
@@ -810,9 +838,9 @@ public class FileSystemRMStateStore extends RMStateStore {
         AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
     byte[] stateData = data.getProto().toByteArray();
     if (isUpdate) {
-      updateFile(nodeCreatePath, stateData);
+      updateFile(nodeCreatePath, stateData, true);
     } else {
-      writeFileWithRetries(nodeCreatePath, stateData);
+      writeFileWithRetries(nodeCreatePath, stateData, true);
     }
   }
 
@@ -825,4 +853,13 @@ public class FileSystemRMStateStore extends RMStateStore {
   public long getRetryInterval() {
     return fsRetryInterval;
   }
+
+  private void setUnreadableBySuperuserXattrib(Path p)
+          throws IOException {
+    if (isHDFS &&
+            !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
+      fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
+              EnumSet.of(XAttrSetFlag.CREATE));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a2a9553/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 3bc0709..4d0e560 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -112,6 +112,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
   }
 
+  public static class StoreStateVerifier {
+    void afterStoreApp(RMStateStore store, ApplicationId appId) {}
+    void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
+            appAttId) {}
+  }
+
   interface RMStateStoreHelper {
     RMStateStore getRMStateStore() throws Exception;
     boolean isFinalStateValid() throws Exception;
@@ -173,7 +179,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     when(mockAttempt.getRMAppAttemptMetrics())
         .thenReturn(mockRmAppAttemptMetrics);
     when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
-        .thenReturn(new AggregateAppResourceUsage(0,0));
+        .thenReturn(new AggregateAppResourceUsage(0, 0));
     dispatcher.attemptId = attemptId;
     store.storeNewApplicationAttempt(mockAttempt);
     waitNotify(dispatcher);
@@ -181,6 +187,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
   }
 
   void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+          throws Exception {
+    testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
+  }
+
+  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
+                           StoreStateVerifier verifier)
       throws Exception {
     long submitTime = System.currentTimeMillis();
     long startTime = System.currentTimeMillis() + 1234;
@@ -205,6 +217,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
         .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
     ApplicationId appId1 = attemptId1.getApplicationId();
     storeApp(store, appId1, submitTime, startTime);
+    verifier.afterStoreApp(store, appId1);
 
     // create application token and client token key for attempt1
     Token<AMRMTokenIdentifier> appAttemptToken1 =
@@ -236,6 +249,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     storeApp(store, appIdRemoved, submitTime, startTime);
     storeAttempt(store, attemptIdRemoved,
         "container_1352994193343_0002_01_000001", null, null, dispatcher);
+    verifier.afterStoreAppAttempt(store, attemptIdRemoved);
 
     RMApp mockRemovedApp = mock(RMApp.class);
     RMAppAttemptMetrics mockRmAppAttemptMetrics = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a2a9553/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index b1e7a0b..2dba8b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -21,8 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -56,6 +62,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     Path workingDirPathURI;
     TestFileSystemRMStore store;
     MiniDFSCluster cluster;
+    boolean adminCheckEnable;
 
     class TestFileSystemRMStore extends FileSystemRMStateStore {
 
@@ -83,8 +90,9 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       }
     }
 
-    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
-      Path workingDirPath = new Path("/Test");
+    public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
+      Path workingDirPath = new Path("/yarn/Test");
+      this.adminCheckEnable = adminCheckEnable;
       this.cluster = cluster;
       FileSystem fs = cluster.getFileSystem();
       fs.mkdirs(workingDirPath);
@@ -99,10 +107,10 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
           workingDirPathURI.toString());
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
-        "100,6000");
+              "100,6000");
       conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
       conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
-          900L);
+              900L);
       this.store = new TestFileSystemRMStore(conf);
       Assert.assertEquals(store.getNumRetries(), 8);
       Assert.assertEquals(store.getRetryInterval(), 900L);
@@ -111,6 +119,11 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       store.startInternal();
       Assert.assertTrue(store.fs != previousFs);
       Assert.assertTrue(store.fs.getConf() == store.fsConf);
+      if (adminCheckEnable) {
+        store.setIsHDFS(true);
+      } else {
+        store.setIsHDFS(false);
+      }
       return store;
     }
 
@@ -123,8 +136,9 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
 
     @Override
     public void writeVersion(Version version) throws Exception {
-      store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
-        .getProto().toByteArray());
+      store.updateFile(store.getVersionNode(), ((VersionPBImpl)
+              version)
+              .getProto().toByteArray(), false);
     }
 
     @Override
@@ -135,7 +149,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     public boolean appExists(RMApp app) throws IOException {
       FileSystem fs = cluster.getFileSystem();
       Path nodePath =
-          store.getAppDir(app.getApplicationId().toString());
+              store.getAppDir(app.getApplicationId().toString());
       return fs.exists(nodePath);
     }
   }
@@ -144,28 +158,28 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
   public void testFSRMStateStore() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
-      fsTester = new TestFSRMStateStoreTester(cluster);
+      fsTester = new TestFSRMStateStoreTester(cluster, false);
       // If the state store is FileSystemRMStateStore then add corrupted entry.
       // It should discard the entry and remove it from file system.
       FSDataOutputStream fsOut = null;
       FileSystemRMStateStore fileSystemRMStateStore =
-          (FileSystemRMStateStore) fsTester.getRMStateStore();
+              (FileSystemRMStateStore) fsTester.getRMStateStore();
       String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
       ApplicationAttemptId attemptId3 =
-          ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
+              ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
       Path appDir =
-          fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
+              fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
       Path tempAppAttemptFile =
-          new Path(appDir, attemptId3.toString() + ".tmp");
+              new Path(appDir, attemptId3.toString() + ".tmp");
       fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
       fsOut.write("Some random data ".getBytes());
       fsOut.close();
 
       testRMAppStateStore(fsTester);
       Assert.assertFalse(fsTester.workingDirPathURI
-          .getFileSystem(conf).exists(tempAppAttemptFile));
+              .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
       testCheckVersion(fsTester);
       testEpoch(fsTester);
@@ -179,12 +193,109 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
   }
 
   @Test(timeout = 60000)
+  public void testHDFSRMStateStore() throws Exception {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    UserGroupInformation yarnAdmin =
+            UserGroupInformation.createUserForTesting("yarn",
+                    new String[]{"admin"});
+    final MiniDFSCluster cluster =
+            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.getFileSystem().mkdir(new Path("/yarn"),
+            FsPermission.valueOf("-rwxrwxrwx"));
+    cluster.getFileSystem().setOwner(new Path("/yarn"), "yarn", "admin");
+    final UserGroupInformation hdfsAdmin = UserGroupInformation.getCurrentUser();
+    final StoreStateVerifier verifier = new StoreStateVerifier() {
+      @Override
+      void afterStoreApp(final RMStateStore store, final ApplicationId appId) {
+        try {
+          // Wait for things to settle
+          Thread.sleep(5000);
+          hdfsAdmin.doAs(
+                  new PrivilegedExceptionAction<Void>() {
+                    @Override
+                    public Void run() throws Exception {
+                      verifyFilesUnreadablebyHDFS(cluster,
+                              ((FileSystemRMStateStore) store).getAppDir
+                                      (appId));
+                      return null;
+                    }
+                  });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      void afterStoreAppAttempt(final RMStateStore store,
+                                final ApplicationAttemptId appAttId) {
+        try {
+          // Wait for things to settle
+          Thread.sleep(5000);
+          hdfsAdmin.doAs(
+                  new PrivilegedExceptionAction<Void>() {
+                    @Override
+                    public Void run() throws Exception {
+                      verifyFilesUnreadablebyHDFS(cluster,
+                              ((FileSystemRMStateStore) store)
+                                      .getAppAttemptDir(appAttId));
+                      return null;
+                    }
+                  });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    try {
+      yarnAdmin.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          fsTester = new TestFSRMStateStoreTester(cluster, true);
+          testRMAppStateStore(fsTester, verifier);
+          return null;
+        }
+      });
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void verifyFilesUnreadablebyHDFS(MiniDFSCluster cluster,
+                                                     Path root) throws Exception{
+    DistributedFileSystem fs = cluster.getFileSystem();
+    Queue<Path> paths = new LinkedList<>();
+    paths.add(root);
+    while (!paths.isEmpty()) {
+      Path p = paths.poll();
+      FileStatus stat = fs.getFileStatus(p);
+      if (!stat.isDirectory()) {
+        try {
+          LOG.warn("\n\n ##Testing path [" + p + "]\n\n");
+          fs.open(p);
+          Assert.fail("Super user should not be able to read ["+ UserGroupInformation.getCurrentUser() + "] [" + p.getName() + "]");
+        } catch (AccessControlException e) {
+          Assert.assertTrue(e.getMessage().contains("superuser is not allowed to perform this operation"));
+        } catch (Exception e) {
+          Assert.fail("Should get an AccessControlException here");
+        }
+      }
+      if (stat.isDirectory()) {
+        FileStatus[] ls = fs.listStatus(p);
+        for (FileStatus f : ls) {
+          paths.add(f.getPath());
+        }
+      }
+    }
+
+  }
+
+  @Test(timeout = 60000)
   public void testCheckMajorVersionChange() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
-      fsTester = new TestFSRMStateStoreTester(cluster) {
+      fsTester = new TestFSRMStateStoreTester(cluster, false) {
         Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
 
         @Override
@@ -238,14 +349,14 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     ApplicationAttemptId attemptId1 =
         ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
     Path appDir =
-        fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
+            fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
     Path appAttemptFile1 =
         new Path(appDir, attemptId1.toString() + ".new");
     FileSystemRMStateStore fileSystemRMStateStore =
         (FileSystemRMStateStore) fsTester.getRMStateStore();
     fileSystemRMStateStore.renameFile(appAttemptFile1,
-        new Path(appAttemptFile1.getParent(),
-            appAttemptFile1.getName() + ".new"));
+            new Path(appAttemptFile1.getParent(),
+                    appAttemptFile1.getName() + ".new"));
   }
 
   @Override
@@ -268,7 +379,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
         new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
     try {
-      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
+      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster, false);
       final RMStateStore store = fsTester.getRMStateStore();
       store.setRMDispatcher(new TestDispatcher());
       final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a2a9553/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
index 491b885..596cba7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
@@ -53,7 +53,7 @@ When there are multiple RMs, the configuration (yarn-site.xml) used by clients a
 
 ### Recovering prevous active-RM's state
 
-With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role.
+With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role. When using the ZKRMStateStore, it is advisable to NOT set the
  "`zookeeper.DigestAuthenticationProvider.superDigest`" property on the Zookeeper cluster to ensure that the zookeeper admin does not have access to YARN application/user credential information.
 
 Deployment
 ----------