You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC

svn commit: r1619019 [5/10] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/had...

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Wed Aug 20 01:34:29 2014
@@ -33,6 +33,7 @@
 #include <limits.h>
 #include <sys/stat.h>
 #include <sys/mount.h>
+#include <sys/wait.h>
 
 static const int DEFAULT_MIN_USERID = 1000;
 
@@ -111,16 +112,16 @@ int check_executor_permissions(char *exe
     return -1;
   }
 
-  // check others do not have read/write/execute permissions
-  if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH)
-      == S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) {
+  // check others do not have write/execute permissions
+  if ((filestat.st_mode & S_IWOTH) == S_IWOTH ||
+      (filestat.st_mode & S_IXOTH) == S_IXOTH) {
     fprintf(LOGFILE,
-            "The container-executor binary should not have read or write or"
-            " execute for others.\n");
+            "The container-executor binary should not have write or execute "
+            "for others.\n");
     return -1;
   }
 
-  // Binary should be setuid/setgid executable
+  // Binary should be setuid executable
   if ((filestat.st_mode & S_ISUID) == 0) {
     fprintf(LOGFILE, "The container-executor binary should be set setuid.\n");
     return -1;
@@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const
 }
 
 /**
+ * Write the exit code of the container into the exit code file
+ * exit_code_file: Path to exit code file where exit code needs to be written
+ */
+static int write_exit_code_file(const char* exit_code_file, int exit_code) {
+  char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
+      exit_code_file);
+  if (tmp_ecode_file == NULL) {
+    return -1;
+  }
+
+  // create with 700
+  int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU);
+  if (ecode_fd == -1) {
+    fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file,
+           strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  char ecode_buf[21];
+  snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code);
+  ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf));
+  close(ecode_fd);
+  if (written == -1) {
+    fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n",
+       tmp_ecode_file, strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  // rename temp file to actual exit code file
+  // use rename as atomic
+  if (rename(tmp_ecode_file, exit_code_file)) {
+    fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n",
+        tmp_ecode_file, exit_code_file, strerror(errno));
+    unlink(tmp_ecode_file);
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  free(tmp_ecode_file);
+  return 0;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+  int child_status = -1;
+  int exit_code = -1;
+  int waitpid_result;
+
+  if (change_effective_user(nm_uid, nm_gid) != 0) {
+    return -1;
+  }
+  do {
+    waitpid_result = waitpid(pid, &child_status, 0);
+  } while (waitpid_result == -1 && errno == EINTR);
+  if (waitpid_result < 0) {
+    fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
+        pid, strerror(errno));
+    return -1;
+  }
+  if (WIFEXITED(child_status)) {
+    exit_code = WEXITSTATUS(child_status);
+  } else if (WIFSIGNALED(child_status)) {
+    exit_code = 0x80 + WTERMSIG(child_status);
+  } else {
+    fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
+  }
+  if (write_exit_code_file(exit_code_file, exit_code) < 0) {
+    return -1;
+  }
+  return exit_code;
+}
+
+/**
  * Change the real and effective user and group to abandon the super user
  * priviledges.
  */
@@ -337,6 +417,10 @@ char *get_container_work_directory(const
                      nm_root, user, app_id, container_id);
 }
 
+char *get_exit_code_file(const char* pid_file) {
+  return concatenate("%s.exitcode", "exit_code_file", 1, pid_file);
+}
+
 char *get_container_launcher_file(const char* work_dir) {
   return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
 }
@@ -879,6 +963,8 @@ int launch_container_as_user(const char 
   int exit_code = -1;
   char *script_file_dest = NULL;
   char *cred_file_dest = NULL;
+  char *exit_code_file = NULL;
+
   script_file_dest = get_container_launcher_file(work_dir);
   if (script_file_dest == NULL) {
     exit_code = OUT_OF_MEMORY;
@@ -889,6 +975,11 @@ int launch_container_as_user(const char 
     exit_code = OUT_OF_MEMORY;
     goto cleanup;
   }
+  exit_code_file = get_exit_code_file(pid_file);
+  if (NULL == exit_code_file) {
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
+  }
 
   // open launch script
   int container_file_source = open_file_as_nm(script_name);
@@ -902,6 +993,13 @@ int launch_container_as_user(const char 
     goto cleanup;
   }
 
+  pid_t child_pid = fork();
+  if (child_pid != 0) {
+    // parent
+    exit_code = wait_and_write_exit_code(child_pid, exit_code_file);
+    goto cleanup;
+  }
+
   // setsid 
   pid_t pid = setsid();
   if (pid == -1) {
@@ -986,6 +1084,7 @@ int launch_container_as_user(const char 
   exit_code = 0;
 
  cleanup:
+  free(exit_code_file);
   free(script_file_dest);
   free(cred_file_dest);
   return exit_code;

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Wed Aug 20 01:34:29 2014
@@ -24,6 +24,13 @@ package hadoop.yarn;
 
 import "yarn_protos.proto";
 
+message ContainerManagerApplicationProto {
+  optional ApplicationIdProto id = 1;
+  optional string user = 2;
+  optional bytes credentials = 3;
+  repeated ApplicationACLMapProto acls = 4;
+}
+
 message DeletionServiceDeleteTaskProto {
   optional int32 id = 1;
   optional string user = 2;
@@ -39,8 +46,3 @@ message LocalizedResourceProto {
   optional int64 size = 3;
 }
 
-message NMDBSchemaVersionProto {
-  optional int32 majorVersion = 1;
-  optional int32 minorVersion = 2;
-}
-

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Aug 20 01:34:29 2014
@@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -91,8 +93,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater {
@@ -201,6 +201,7 @@ public class TestNodeStatusUpdater {
       Dispatcher mockDispatcher = mock(Dispatcher.class);
       EventHandler mockEventHandler = mock(EventHandler.class);
       when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
+      NMStateStoreService stateStore = new NMNullStateStoreService();
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -226,9 +227,8 @@ public class TestNodeStatusUpdater {
                 firstContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -257,9 +257,8 @@ public class TestNodeStatusUpdater {
                 secondContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -784,7 +783,7 @@ public class TestNodeStatusUpdater {
     ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
                                                                               
                                                                               
-    nodeStatusUpdater.updateStoppedContainersInCache(cId);
+    nodeStatusUpdater.addCompletedContainer(cId);
     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
                                                                               
     long time1 = System.currentTimeMillis();                                  

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed Aug 20 01:34:29 2014
@@ -233,7 +233,7 @@ public abstract class BaseContainerManag
   protected DeletionService createDeletionService() {
     return new DeletionService(exec) {
       @Override
-      public void delete(String user, Path subDir, Path[] baseDirs) {
+      public void delete(String user, Path subDir, Path... baseDirs) {
         // Don't do any deletions.
         LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
             + ", baseDirs - " + baseDirs); 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed Aug 20 01:34:29 2014
@@ -191,7 +191,8 @@ public class TestAuxServices {
     ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
         ContainerId.newInstance(attemptId, 1), "", "",
         Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
-    Container container = new ContainerImpl(null, null, null, null, null, cti);
+    Container container = new ContainerImpl(null, null, null, null, null,
+        null, cti);
     ContainerId containerId = container.getContainerId();
     Resource resource = container.getResource();
     event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed Aug 20 01:34:29 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -722,6 +723,8 @@ public class TestContainer {
       Context context = mock(Context.class);
       when(context.getApplications()).thenReturn(
           new ConcurrentHashMap<ApplicationId, Application>());
+      NMNullStateStoreService stateStore = new NMNullStateStoreService();
+      when(context.getNMStateStore()).thenReturn(stateStore);
       ContainerExecutor executor = mock(ContainerExecutor.class);
       launcher =
           new ContainersLauncher(context, dispatcher, executor, null, null);
@@ -777,7 +780,8 @@ public class TestContainer {
       }
       when(ctxt.getServiceData()).thenReturn(serviceData);
 
-      c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+      c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
+          ctxt, null, metrics, identifier);
       dispatcher.register(ContainerEventType.class,
           new EventHandler<ContainerEvent>() {
             @Override

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -21,20 +21,29 @@ package org.apache.hadoop.yarn.server.no
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
+  private Map<ApplicationId, ContainerManagerApplicationProto> apps;
+  private Set<ApplicationId> finishedApps;
+  private Map<ContainerId, RecoveredContainerState> containerStates;
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
@@ -44,6 +53,130 @@ public class NMMemoryStateStoreService e
     super(NMMemoryStateStoreService.class.getName());
   }
 
+  @Override
+  protected void initStorage(Configuration conf) {
+    apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
+    finishedApps = new HashSet<ApplicationId>();
+    containerStates = new HashMap<ContainerId, RecoveredContainerState>();
+    nmTokenState = new RecoveredNMTokensState();
+    nmTokenState.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    containerTokenState = new RecoveredContainerTokensState();
+    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
+    trackerStates = new HashMap<TrackerKey, TrackerState>();
+    deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+  }
+
+  @Override
+  protected void startStorage() {
+  }
+
+  @Override
+  protected void closeStorage() {
+  }
+
+
+  @Override
+  public RecoveredApplicationsState loadApplicationsState()
+      throws IOException {
+    RecoveredApplicationsState state = new RecoveredApplicationsState();
+    state.applications = new ArrayList<ContainerManagerApplicationProto>(
+        apps.values());
+    state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
+    return state;
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto proto) throws IOException {
+    ContainerManagerApplicationProto protoCopy =
+        ContainerManagerApplicationProto.parseFrom(proto.toByteString());
+    apps.put(appId, protoCopy);
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId) {
+    finishedApps.add(appId);
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId) throws IOException {
+    apps.remove(appId);
+    finishedApps.remove(appId);
+  }
+
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    // return a copy so caller can't modify our state
+    List<RecoveredContainerState> result =
+        new ArrayList<RecoveredContainerState>(containerStates.size());
+    for (RecoveredContainerState rcs : containerStates.values()) {
+      RecoveredContainerState rcsCopy = new RecoveredContainerState();
+      rcsCopy.status = rcs.status;
+      rcsCopy.exitCode = rcs.exitCode;
+      rcsCopy.killed = rcs.killed;
+      rcsCopy.diagnostics = rcs.diagnostics;
+      rcsCopy.startRequest = rcs.startRequest;
+      result.add(rcsCopy);
+    }
+    return new ArrayList<RecoveredContainerState>();
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.startRequest = startRequest;
+    containerStates.put(containerId, rcs);
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.diagnostics = diagnostics.toString();
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    if (rcs.exitCode != ContainerExitStatus.INVALID) {
+      throw new IOException("Container already completed");
+    }
+    rcs.status = RecoveredContainerStatus.LAUNCHED;
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.killed = true;
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.COMPLETED;
+    rcs.exitCode = exitCode;
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+    containerStates.remove(containerId);
+  }
+
+  private RecoveredContainerState getRecoveredContainerState(
+      ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = containerStates.get(containerId);
+    if (rcs == null) {
+      throw new IOException("No start request for " + containerId);
+    }
+    return rcs;
+  }
+
   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
     LocalResourceTrackerState result = new LocalResourceTrackerState();
     result.localizedResources.addAll(ts.localizedResources.values());
@@ -117,25 +250,6 @@ public class NMMemoryStateStoreService e
     }
   }
 
-  @Override
-  protected void initStorage(Configuration conf) {
-    nmTokenState = new RecoveredNMTokensState();
-    nmTokenState.applicationMasterKeys =
-        new HashMap<ApplicationAttemptId, MasterKey>();
-    containerTokenState = new RecoveredContainerTokensState();
-    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
-    trackerStates = new HashMap<TrackerKey, TrackerState>();
-    deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
-  }
-
-  @Override
-  protected void startStorage() {
-  }
-
-  @Override
-  protected void closeStorage() {
-  }
-
 
   @Override
   public RecoveredDeletionServiceState loadDeletionServiceState()

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -25,31 +25,49 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -114,12 +132,12 @@ public class TestNMLeveldbStateStoreServ
   @Test
   public void testCheckVersion() throws IOException {
     // default version
-    NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
+    Version defaultVersion = stateStore.getCurrentVersion();
     Assert.assertEquals(defaultVersion, stateStore.loadVersion());
 
     // compatible version
-    NMDBSchemaVersion compatibleVersion =
-        NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
           defaultVersion.getMinorVersion() + 2);
     stateStore.storeVersion(compatibleVersion);
     Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
@@ -128,8 +146,8 @@ public class TestNMLeveldbStateStoreServ
     Assert.assertEquals(defaultVersion, stateStore.loadVersion());
 
     // incompatible version
-    NMDBSchemaVersion incompatibleVersion =
-      NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
+    Version incompatibleVersion =
+      Version.newInstance(defaultVersion.getMajorVersion() + 1,
           defaultVersion.getMinorVersion());
     stateStore.storeVersion(incompatibleVersion);
     try {
@@ -142,6 +160,163 @@ public class TestNMLeveldbStateStoreServ
   }
 
   @Test
+  public void testApplicationStorage() throws IOException {
+    // test empty when no state
+    RecoveredApplicationsState state = stateStore.loadApplicationsState();
+    assertTrue(state.getApplications().isEmpty());
+    assertTrue(state.getFinishedApplications().isEmpty());
+
+    // store an application and verify recovered
+    final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId1).getProto());
+    builder.setUser("user1");
+    ContainerManagerApplicationProto appProto1 = builder.build();
+    stateStore.storeApplication(appId1, appProto1);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(1, state.getApplications().size());
+    assertEquals(appProto1, state.getApplications().get(0));
+    assertTrue(state.getFinishedApplications().isEmpty());
+
+    // finish an application and add a new one
+    stateStore.storeFinishedApplication(appId1);
+    final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+    builder = ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId2).getProto());
+    builder.setUser("user2");
+    ContainerManagerApplicationProto appProto2 = builder.build();
+    stateStore.storeApplication(appId2, appProto2);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(2, state.getApplications().size());
+    assertTrue(state.getApplications().contains(appProto1));
+    assertTrue(state.getApplications().contains(appProto2));
+    assertEquals(1, state.getFinishedApplications().size());
+    assertEquals(appId1, state.getFinishedApplications().get(0));
+
+    // test removing an application
+    stateStore.storeFinishedApplication(appId2);
+    stateStore.removeApplication(appId2);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(1, state.getApplications().size());
+    assertEquals(appProto1, state.getApplications().get(0));
+    assertEquals(1, state.getFinishedApplications().size());
+    assertEquals(appId1, state.getFinishedApplications().get(0));
+  }
+
+  @Test
+  public void testContainerStorage() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers =
+        stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens =
+        ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds, serviceData, containerTokens,
+        acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId =
+        new ContainerTokenIdentifier(containerId, "host", "user",
+            containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
+            13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq =
+        StartContainerRequest.newInstance(clc, containerToken);
+
+    // store a container and verify recovered
+    stateStore.storeContainer(containerId, containerReq);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
+    // launch the container, add some diagnostics, and verify recovered
+    StringBuilder diags = new StringBuilder();
+    stateStore.storeContainerLaunched(containerId);
+    diags.append("some diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // mark the container killed, add some more diags, and verify recovered
+    diags.append("some more diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerKilled(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // add yet more diags, mark container completed, and verify recovered
+    diags.append("some final diags");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerCompleted(containerId, 21);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
+    assertEquals(21, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // remove the container and verify not recovered
+    stateStore.removeContainer(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+  }
+
+  @Test
   public void testStartResourceLocalization() throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed Aug 20 01:34:29 2014
@@ -209,7 +209,7 @@ public class TestNMWebServer {
             BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
             "password".getBytes(), currentTime);
       Container container =
-          new ContainerImpl(conf, dispatcher, launchContext,
+          new ContainerImpl(conf, dispatcher, stateStore, launchContext,
             null, metrics,
             BuilderUtils.newContainerTokenIdentifier(containerToken)) {
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml Wed Aug 20 01:34:29 2014
@@ -108,4 +108,27 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings</name>
+    <value></value>
+    <description>
+      A list of mappings that will be used to assign jobs to queues
+      The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
+      Typically this list will be used to map users to queues,
+      for example, u:%user:%user maps all users to queues with the same name
+      as the user.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
+    <value>false</value>
+    <description>
+      If a queue mapping is present, will it override the value specified
+      by the user? This can be used by administrators to place jobs in queues
+      that are different than the one specified by the user.
+      The default is false.
+    </description>
+  </property>
+
 </configuration>

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Aug 20 01:34:29 2014
@@ -244,6 +244,37 @@
           </execution>
         </executions>
       </plugin>
+
+     <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+                <param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
+                <param>${basedir}/../hadoop-yarn-server-common/src/main/proto</param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+		          <include>yarn_server_resourcemanager_recovery.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Wed Aug 20 01:34:29 2014
@@ -90,7 +90,9 @@ public class AdminService extends Compos
   private EmbeddedElectorService embeddedElector;
 
   private Server server;
-  private InetSocketAddress masterServiceAddress;
+
+  // Address to use for binding. May be a wildcard address.
+  private InetSocketAddress masterServiceBindAddress;
   private AccessControlList adminAcl;
 
   private final RecordFactory recordFactory = 
@@ -114,10 +116,12 @@ public class AdminService extends Compos
       }
     }
 
-    masterServiceAddress = conf.getSocketAddr(
+    masterServiceBindAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+
     adminAcl = new AccessControlList(conf.get(
         YarnConfiguration.YARN_ADMIN_ACL,
         YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
@@ -141,7 +145,7 @@ public class AdminService extends Compos
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
     this.server = (Server) rpc.getServer(
-        ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
+        ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress,
         conf, null,
         conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
             YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
@@ -170,8 +174,10 @@ public class AdminService extends Compos
     }
 
     this.server.start();
-    conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
-        server.getListenerAddress());
+    conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                           YarnConfiguration.RM_ADMIN_ADDRESS,
+                           YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+                           server.getListenerAddress());
   }
 
   protected void stopServer() throws Exception {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Aug 20 01:34:29 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -127,6 +130,7 @@ public class ApplicationMasterService ex
     YarnRPC rpc = YarnRPC.create(conf);
 
     InetSocketAddress masterServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
@@ -159,7 +163,9 @@ public class ApplicationMasterService ex
     
     this.server.start();
     this.bindAddress =
-        conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                               YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                               YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
                                server.getListenerAddress());
     super.serviceStart();
   }
@@ -186,7 +192,7 @@ public class ApplicationMasterService ex
     return result;
   }
 
-  private ApplicationAttemptId authorizeRequest()
+  private AMRMTokenIdentifier authorizeRequest()
       throws YarnException {
 
     UserGroupInformation remoteUgi;
@@ -223,7 +229,7 @@ public class ApplicationMasterService ex
       throw RPCUtil.getRemoteException(message);
     }
 
-    return appTokenIdentifier.getApplicationAttemptId();
+    return appTokenIdentifier;
   }
 
   @Override
@@ -231,7 +237,9 @@ public class ApplicationMasterService ex
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -330,7 +338,8 @@ public class ApplicationMasterService ex
       FinishApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        authorizeRequest().getApplicationAttemptId();
 
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
     if (lock == null) {
@@ -405,7 +414,10 @@ public class ApplicationMasterService ex
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    ApplicationAttemptId appAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+    ApplicationAttemptId appAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -554,6 +566,23 @@ public class ApplicationMasterService ex
       allocateResponse
           .setPreemptionMessage(generatePreemptionMessage(allocation));
 
+      // update AMRMToken if the token is rolled-up
+      MasterKeyData nextMasterKey =
+          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+      if (nextMasterKey != null
+          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+        Token<AMRMTokenIdentifier> amrmToken =
+            rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+              appAttemptId);
+        ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+            .toString(), amrmToken.getPassword(), amrmToken.getService()
+            .toString()));
+      }
+
       /*
        * As we are updating the response inside the lock object so we don't
        * need to worry about unregister call occurring in between (which

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Aug 20 01:34:29 2014
@@ -199,7 +199,9 @@ public class ClientRMService extends Abs
     }
     
     this.server.start();
-    clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
+    clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                                               YarnConfiguration.RM_ADDRESS,
+                                               YarnConfiguration.DEFAULT_RM_ADDRESS,
                                                server.getListenerAddress());
     super.serviceStart();
   }
@@ -213,7 +215,9 @@ public class ClientRMService extends Abs
   }
 
   InetSocketAddress getBindAddress(Configuration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+    return conf.getSocketAddr(
+            YarnConfiguration.RM_BIND_HOST,
+            YarnConfiguration.RM_ADDRESS,
             YarnConfiguration.DEFAULT_RM_ADDRESS,
             YarnConfiguration.DEFAULT_RM_PORT);
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java Wed Aug 20 01:34:29 2014
@@ -60,7 +60,7 @@ public class RMSecretManagerService exte
     clientToAMSecretManager = createClientToAMTokenSecretManager();
     rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
 
-    amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+    amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext);
     rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
 
     rmDTSecretManager =
@@ -115,8 +115,8 @@ public class RMSecretManagerService exte
   }
 
   protected AMRMTokenSecretManager createAMRMTokenSecretManager(
-      Configuration conf) {
-    return new AMRMTokenSecretManager(conf);
+      Configuration conf, RMContext rmContext) {
+    return new AMRMTokenSecretManager(conf, rmContext);
   }
 
   protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Aug 20 01:34:29 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -32,11 +33,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
@@ -88,8 +92,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMAuthenticationHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -150,7 +157,8 @@ public class ResourceManager extends Com
   private AppReportFetcher fetcher = null;
   protected ResourceTrackerService resourceTracker;
 
-  private String webAppAddress;
+  @VisibleForTesting
+  protected String webAppAddress;
   private ConfigurationProvider configurationProvider = null;
   /** End of Active services */
 
@@ -225,7 +233,9 @@ public class ResourceManager extends Com
     }
     createAndInitActiveServices();
 
-    webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf);
+    webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
+                      YarnConfiguration.RM_BIND_HOST,
+                      WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
 
     this.rmLoginUGI = UserGroupInformation.getCurrentUser();
 
@@ -453,7 +463,6 @@ public class ResourceManager extends Com
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
 
       clientRM = createClientRMService();
-      rmContext.setClientRMService(clientRM);
       addService(clientRM);
       rmContext.setClientRMService(clientRM);
 
@@ -789,6 +798,88 @@ public class ResourceManager extends Com
   }
   
   protected void startWepApp() {
+
+    // Use the customized yarn filter instead of the standard kerberos filter to
+    // allow users to authenticate using delegation tokens
+    // 4 conditions need to be satisfied -
+    // 1. security is enabled
+    // 2. http auth type is set to kerberos
+    // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true
+    // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer
+
+    Configuration conf = getConfig();
+    boolean useYarnAuthenticationFilter =
+        conf.getBoolean(
+          YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER,
+          YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER);
+    String authPrefix = "hadoop.http.authentication.";
+    String authTypeKey = authPrefix + "type";
+    String filterInitializerConfKey = "hadoop.http.filter.initializers";
+    String actualInitializers = "";
+    Class<?>[] initializersClasses =
+        conf.getClasses(filterInitializerConfKey);
+
+    boolean hasHadoopAuthFilterInitializer = false;
+    boolean hasRMAuthFilterInitializer = false;
+    if (initializersClasses != null) {
+      for (Class<?> initializer : initializersClasses) {
+        if (initializer.getName().equals(
+          AuthenticationFilterInitializer.class.getName())) {
+          hasHadoopAuthFilterInitializer = true;
+        }
+        if (initializer.getName().equals(
+          RMAuthenticationFilterInitializer.class.getName())) {
+          hasRMAuthFilterInitializer = true;
+        }
+      }
+      if (UserGroupInformation.isSecurityEnabled()
+          && useYarnAuthenticationFilter
+          && hasHadoopAuthFilterInitializer
+          && conf.get(authTypeKey, "").equals(
+            KerberosAuthenticationHandler.TYPE)) {
+        ArrayList<String> target = new ArrayList<String>();
+        for (Class<?> filterInitializer : initializersClasses) {
+          if (filterInitializer.getName().equals(
+            AuthenticationFilterInitializer.class.getName())) {
+            if (hasRMAuthFilterInitializer == false) {
+              target.add(RMAuthenticationFilterInitializer.class.getName());
+            }
+            continue;
+          }
+          target.add(filterInitializer.getName());
+        }
+        actualInitializers = StringUtils.join(",", target);
+
+        LOG.info("Using RM authentication filter(kerberos/delegation-token)"
+            + " for RM webapp authentication");
+        RMAuthenticationHandler
+          .setSecretManager(getClientRMService().rmDTSecretManager);
+        String yarnAuthKey =
+            authPrefix + RMAuthenticationFilter.AUTH_HANDLER_PROPERTY;
+        conf.setStrings(yarnAuthKey, RMAuthenticationHandler.class.getName());
+        conf.set(filterInitializerConfKey, actualInitializers);
+      }
+    }
+
+    // if security is not enabled and the default filter initializer has not 
+    // been set, set the initializer to include the
+    // RMAuthenticationFilterInitializer which in turn will set up the simple
+    // auth filter.
+
+    String initializers = conf.get(filterInitializerConfKey);
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      if (initializersClasses == null || initializersClasses.length == 0) {
+        conf.set(filterInitializerConfKey,
+          RMAuthenticationFilterInitializer.class.getName());
+        conf.set(authTypeKey, "simple");
+      } else if (initializers.equals(StaticUserWebFilter.class.getName())) {
+        conf.set(filterInitializerConfKey,
+          RMAuthenticationFilterInitializer.class.getName() + ","
+              + initializers);
+        conf.set(authTypeKey, "simple");
+      }
+    }
+
     Builder<ApplicationMasterService> builder = 
         WebApps
             .$for("cluster", ApplicationMasterService.class, masterService,
@@ -1026,6 +1117,9 @@ public class ResourceManager extends Com
     // recover RMdelegationTokenSecretManager
     rmContext.getRMDelegationTokenSecretManager().recover(state);
 
+    // recover AMRMTokenSecretManager
+    rmContext.getAMRMTokenSecretManager().recover(state);
+
     // recover applications
     rmAppManager.recover(state);
   }
@@ -1067,6 +1161,9 @@ public class ResourceManager extends Com
     ((Service)dispatcher).init(this.conf);
     ((Service)dispatcher).start();
     removeService((Service)rmDispatcher);
+    // Need to stop previous rmDispatcher before assigning new dispatcher
+    // otherwise causes "AsyncDispatcher event handler" thread leak
+    ((Service) rmDispatcher).stop();
     rmDispatcher = dispatcher;
     addIfService(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Aug 20 01:34:29 2014
@@ -121,6 +121,7 @@ public class ResourceTrackerService exte
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
@@ -175,9 +176,11 @@ public class ResourceTrackerService exte
       }
       refreshServiceAcls(conf, RMPolicyProvider.getInstance());
     }
-
+ 
     this.server.start();
-    conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+    conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+			   YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+			   YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
                            server.getListenerAddress());
   }
 
@@ -308,7 +311,8 @@ public class ResourceTrackerService exte
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeReconnectEvent(nodeId, rmNode));
+          new RMNodeReconnectEvent(nodeId, rmNode,
+              request.getRunningApplications()));
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Aug 20 01:34:29 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
@@ -226,7 +227,7 @@ public class AMLauncher implements Runna
     }
 
     // Add AMRMToken
-    Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+    Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
     if (amrmToken != null) {
       credentials.addToken(amrmToken.getService(), amrmToken);
     }
@@ -236,8 +237,12 @@ public class AMLauncher implements Runna
   }
 
   @VisibleForTesting
-  protected Token<AMRMTokenIdentifier> getAMRMToken() {
-    return application.getAMRMToken();
+  protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
+    Token<AMRMTokenIdentifier> amrmToken =
+        this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          application.getAppAttemptId());
+    ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
+    return amrmToken;
   }
   
   @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Wed Aug 20 01:34:29 2014
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,20 +44,22 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,14 +71,20 @@ import com.google.common.annotations.Vis
  * FileSystem interface. Does not use directories so that simple key-value
  * stores can be used. The retry policy for the real filesystem client must be
  * configured separately to enable retry of filesystem operations when needed.
+ *
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
  */
 public class FileSystemRMStateStore extends RMStateStore {
 
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
-  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-    .newInstance(1, 1);
+  protected static final Version CURRENT_VERSION_INFO = Version
+    .newInstance(1, 2);
+  protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
+      "AMRMTokenSecretManagerNode";
 
   protected FileSystem fs;
 
@@ -89,6 +98,7 @@ public class FileSystemRMStateStore exte
   @VisibleForTesting
   Path fsWorkingPath;
 
+  Path amrmTokenSecretManagerRoot;
   @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
@@ -96,6 +106,8 @@ public class FileSystemRMStateStore exte
     rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
     rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
+    amrmTokenSecretManagerRoot =
+        new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
   }
 
   @Override
@@ -113,6 +125,7 @@ public class FileSystemRMStateStore exte
     fs = fsWorkingPath.getFileSystem(conf);
     fs.mkdirs(rmDTSecretManagerRoot);
     fs.mkdirs(rmAppRoot);
+    fs.mkdirs(amrmTokenSecretManagerRoot);
   }
 
   @Override
@@ -121,18 +134,18 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
 
   @Override
-  protected synchronized RMStateVersion loadVersion() throws Exception {
+  protected synchronized Version loadVersion() throws Exception {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
     if (fs.exists(versionNodePath)) {
       FileStatus status = fs.getFileStatus(versionNodePath);
       byte[] data = readFile(versionNodePath, status.getLen());
-      RMStateVersion version =
-          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
       return version;
     }
     return null;
@@ -142,7 +155,7 @@ public class FileSystemRMStateStore exte
   protected synchronized void storeVersion() throws Exception {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
     byte[] data =
-        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (fs.exists(versionNodePath)) {
       updateFile(versionNodePath, data);
     } else {
@@ -180,9 +193,32 @@ public class FileSystemRMStateStore exte
     loadRMDTSecretManagerState(rmState);
     // recover RM applications
     loadRMAppState(rmState);
+    // recover AMRMTokenSecretManager
+    loadAMRMTokenSecretManagerState(rmState);
     return rmState;
   }
 
+  private void loadAMRMTokenSecretManagerState(RMState rmState)
+      throws Exception {
+    checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
+    Path amrmTokenSecretManagerStateDataDir =
+        new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+    FileStatus status;
+    try {
+      status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
+      assert status.isFile();
+    } catch (FileNotFoundException ex) {
+      return;
+    }
+    byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
+    AMRMTokenSecretManagerStatePBImpl stateData =
+        new AMRMTokenSecretManagerStatePBImpl(
+          AMRMTokenSecretManagerStateProto.parseFrom(data));
+    rmState.amrmTokenSecretManagerState =
+        AMRMTokenSecretManagerState.newInstance(
+          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+  }
+
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
       List<ApplicationAttemptState> attempts =
@@ -597,4 +633,25 @@ public class FileSystemRMStateStore exte
     return new Path(root, nodeName);
   }
 
+  @Override
+  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate){
+    Path nodeCreatePath =
+        getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+    byte[] stateData = data.getProto().toByteArray();
+    try {
+      if (isUpdate) {
+        updateFile(nodeCreatePath, stateData);
+      } else {
+        writeFile(nodeCreatePath, stateData);
+      }
+    } catch (Exception ex) {
+      LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+      notifyStoreOperationFailed(ex);
+    }
+  }
+
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Wed Aug 20 01:34:29 2014
@@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -72,6 +73,10 @@ public class MemoryRMStateStore extends 
       state.rmSecretManagerState.getTokenState());
     returnState.rmSecretManagerState.dtSequenceNumber =
         state.rmSecretManagerState.dtSequenceNumber;
+    returnState.amrmTokenSecretManagerState =
+        state.amrmTokenSecretManagerState == null ? null
+            : AMRMTokenSecretManagerState
+              .newInstance(state.amrmTokenSecretManagerState);
     return returnState;
   }
   
@@ -254,7 +259,7 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  protected RMStateVersion loadVersion() throws Exception {
+  protected Version loadVersion() throws Exception {
     return null;
   }
 
@@ -263,11 +268,21 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return null;
   }
 
   @Override
+  public void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate) {
+    if (amrmTokenSecretManagerState != null) {
+      state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState
+          .newInstance(amrmTokenSecretManagerState);
+    }
+  }
+
+  @Override
   public void deleteStore() throws Exception {
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Wed Aug 20 01:34:29 2014
@@ -25,9 +25,10 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -122,7 +123,7 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected RMStateVersion loadVersion() throws Exception {
+  protected Version loadVersion() throws Exception {
     // Do nothing
     return null;
   }
@@ -133,12 +134,18 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     // Do nothing
     return null;
   }
 
   @Override
+  public void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState state, boolean isUpdate) {
+    //DO Nothing
+  }
+
+  @Override
   public void deleteStore() throws Exception {
     // Do nothing
   }