You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC

svn commit: r1602947 [4/5] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Mon Jun 16 18:13:57 2014
@@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
 @Private
 @Unstable
@@ -228,4 +230,11 @@ public class FSParentQueue extends FSQue
     // Should never be called since all applications are submitted to LeafQueues
     return null;
   }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+    
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -117,7 +118,6 @@ import com.google.common.annotations.Vis
 @SuppressWarnings("unchecked")
 public class FairScheduler extends
     AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
-  private boolean initialized;
   private FairSchedulerConfiguration conf;
 
   private Resource incrAllocation;
@@ -137,6 +137,11 @@ public class FairScheduler extends
   // How often fair shares are re-calculated (ms)
   protected long UPDATE_INTERVAL = 500;
 
+  private Thread updateThread;
+  private Thread schedulingThread;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   // Aggregate metrics
   FSQueueMetrics rootMetrics;
 
@@ -182,6 +187,7 @@ public class FairScheduler extends
   AllocationConfiguration allocConf;
   
   public FairScheduler() {
+    super(FairScheduler.class.getName());
     clock = new SystemClock();
     allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
@@ -473,7 +479,8 @@ public class FairScheduler extends
     return resToPreempt;
   }
 
-  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+  public synchronized RMContainerTokenSecretManager
+      getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
   }
 
@@ -829,6 +836,12 @@ public class FairScheduler extends
     SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
         clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
 
+    // Set amResource for this app
+    if (!application.getUnmanagedAM() && ask.size() == 1
+        && application.getLiveContainers().isEmpty()) {
+      application.setAMResource(ask.get(0).getCapability());
+    }
+
     // Release containers
     for (ContainerId releasedContainerId : release) {
       RMContainer rmContainer = getRMContainer(releasedContainerId);
@@ -1059,8 +1072,8 @@ public class FairScheduler extends
   private boolean shouldAttemptPreemption() {
     if (preemptionEnabled) {
       return (preemptionUtilizationThreshold < Math.max(
-          (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
-          (float) rootMetrics.getAvailableVirtualCores() /
+          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
+          (float) rootMetrics.getAllocatedVirtualCores() /
               clusterResource.getVirtualCores()));
     }
     return false;
@@ -1154,87 +1167,130 @@ public class FairScheduler extends
     // NOT IMPLEMENTED
   }
 
-  @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
-      validateConf(this.conf);
-      minimumAllocation = this.conf.getMinimumAllocation();
-      maximumAllocation = this.conf.getMaximumAllocation();
-      incrAllocation = this.conf.getIncrementAllocation();
-      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-              this.conf.getContinuousSchedulingSleepMs();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      preemptionUtilizationThreshold =
-          this.conf.getPreemptionUtilizationThreshold();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-      usePortForNodeName = this.conf.getUsePortForNodeName();
-      
-      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-      this.rmContext = rmContext;
-      // This stores per-application scheduling information
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSSchedulerApp>>();
-      this.eventLog = new FairSchedulerEventLog();
-      eventLog.init(this.conf);
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 
-      initialized = true;
+  private synchronized void initScheduler(Configuration conf)
+      throws IOException {
+    this.conf = new FairSchedulerConfiguration(conf);
+    validateConf(this.conf);
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
+    incrAllocation = this.conf.getIncrementAllocation();
+    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+    continuousSchedulingSleepMs =
+        this.conf.getContinuousSchedulingSleepMs();
+    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+    preemptionEnabled = this.conf.getPreemptionEnabled();
+    preemptionUtilizationThreshold =
+        this.conf.getPreemptionUtilizationThreshold();
+    assignMultiple = this.conf.getAssignMultiple();
+    maxAssign = this.conf.getMaxAssign();
+    sizeBasedWeight = this.conf.getSizeBasedWeight();
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+    usePortForNodeName = this.conf.getUsePortForNodeName();
+
+    rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+    // This stores per-application scheduling information
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
+    this.eventLog = new FairSchedulerEventLog();
+    eventLog.init(this.conf);
 
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
+    allocConf = new AllocationConfiguration(conf);
+    try {
+      queueMgr.initialize(conf);
+    } catch (Exception e) {
+      throw new IOException("Failed to start FairScheduler", e);
+    }
 
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.setName("FairSchedulerUpdateThread");
-      updateThread.setDaemon(true);
-      updateThread.start();
+    updateThread = new Thread(new UpdateThread());
+    updateThread.setName("FairSchedulerUpdateThread");
+    updateThread.setDaemon(true);
 
-      if (continuousSchedulingEnabled) {
-        // start continuous scheduling thread
-        Thread schedulingThread = new Thread(
+    if (continuousSchedulingEnabled) {
+      // start continuous scheduling thread
+      schedulingThread = new Thread(
           new Runnable() {
             @Override
             public void run() {
               continuousScheduling();
             }
           }
-        );
-        schedulingThread.setName("ContinuousScheduling");
-        schedulingThread.setDaemon(true);
-        schedulingThread.start();
+      );
+      schedulingThread.setName("ContinuousScheduling");
+      schedulingThread.setDaemon(true);
+    }
+
+    allocsLoader.init(conf);
+    allocsLoader.setReloadListener(new AllocationReloadListener());
+    // If we fail to load allocations file on initialize, we want to fail
+    // immediately.  After a successful load, exceptions on future reloads
+    // will just result in leaving things as they are.
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize FairScheduler", e);
+    }
+  }
+
+  private synchronized void startSchedulerThreads() {
+    Preconditions.checkNotNull(updateThread, "updateThread is null");
+    Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+    updateThread.start();
+    if (continuousSchedulingEnabled) {
+      Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
+      schedulingThread.start();
+    }
+    allocsLoader.start();
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (updateThread != null) {
+        updateThread.interrupt();
+        updateThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
-      
-      allocsLoader.init(conf);
-      allocsLoader.setReloadListener(new AllocationReloadListener());
-      // If we fail to load allocations file on initialize, we want to fail
-      // immediately.  After a successful load, exceptions on future reloads
-      // will just result in leaving things as they are.
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize FairScheduler", e);
+      if (continuousSchedulingEnabled) {
+        if (schedulingThread != null) {
+          schedulingThread.interrupt();
+          schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
+        }
       }
-      allocsLoader.start();
-    } else {
-      try {
-        allocsLoader.reloadAllocations();
-      } catch (Exception e) {
-        LOG.error("Failed to reload allocations file", e);
+      if (allocsLoader != null) {
+        allocsLoader.stop();
       }
     }
+
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    try {
+      allocsLoader.reloadAllocations();
+    } catch (Exception e) {
+      LOG.error("Failed to reload allocations file", e);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun 16 18:13:57 2014
@@ -149,4 +149,15 @@ public abstract class SchedulingPolicy {
    */
   public abstract boolean checkIfUsageOverFairShare(
       Resource usage, Resource fairShare);
+
+  /**
+   * Check if a leaf queue's AM resource usage over its limit under this policy
+   *
+   * @param usage {@link Resource} the resource used by application masters
+   * @param maxAMResource {@link Resource} the maximum allowed resource for
+   *                                      application masters
+   * @return true if AM resource usage is over the limit
+   */
+  public abstract boolean checkIfAMResourceUsageOverLimit(
+      Resource usage, Resource maxAMResource);
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Mon Jun 16 18:13:57 2014
@@ -75,6 +75,11 @@ public class DominantResourceFairnessPol
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return !Resources.fitsIn(usage, maxAMResource);
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun 16 18:13:57 2014
@@ -125,6 +125,11 @@ public class FairSharePolicy extends Sch
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_ANY;
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Mon Jun 16 18:13:57 2014
@@ -95,6 +95,11 @@ public class FifoPolicy extends Scheduli
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Jun 16 18:13:57 2014
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -111,7 +113,6 @@ public class FifoScheduler extends
 
   Configuration conf;
 
-  private boolean initialized;
   private boolean usePortForNodeName;
 
   private ActiveUsersManager activeUsersManager;
@@ -178,8 +179,60 @@ public class FifoScheduler extends
     public ActiveUsersManager getActiveUsersManager() {
       return activeUsersManager;
     }
+
+    @Override
+    public void recoverContainer(Resource clusterResource,
+        SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+        return;
+      }
+      increaseUsedResources(rmContainer);
+      updateAppHeadRoom(schedulerAttempt);
+      updateAvailableResourcesMetrics();
+    }
   };
 
+  public FifoScheduler() {
+    super(FifoScheduler.class.getName());
+  }
+
+  private synchronized void initScheduler(Configuration conf) {
+    validateConf(conf);
+    //Use ConcurrentSkipListMap because applications need to be ordered
+    this.applications =
+        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    this.minimumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    this.maximumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    this.usePortForNodeName = conf.getBoolean(
+        YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+    this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+        conf);
+    this.activeUsersManager = new ActiveUsersManager(metrics);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     this.conf = conf;
@@ -216,35 +269,17 @@ public class FifoScheduler extends
   }
 
   @Override
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  @Override
   public synchronized void
       reinitialize(Configuration conf, RMContext rmContext) throws IOException
   {
     setConf(conf);
-    if (!this.initialized) {
-      validateConf(conf);
-      this.rmContext = rmContext;
-      //Use ConcurrentSkipListMap because applications need to be ordered
-      this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
-      this.minimumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-      this.maximumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-      this.usePortForNodeName = conf.getBoolean(
-          YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
-      this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
-          conf);
-      this.activeUsersManager = new ActiveUsersManager(metrics);
-      this.initialized = true;
-    }
   }
 
-
   @Override
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@@ -465,7 +500,7 @@ public class FifoScheduler extends
       if (attempt == null) {
         continue;
       }
-      attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+      updateAppHeadRoom(attempt);
     }
   }
 
@@ -636,11 +671,10 @@ public class FifoScheduler extends
             application.allocate(type, node, priority, request, container);
         
         // Inform the node
-        node.allocateContainer(application.getApplicationId(), 
-            rmContainer);
+        node.allocateContainer(rmContainer);
 
         // Update usage for this container
-        Resources.addTo(usedResource, capability);
+        increaseUsedResources(rmContainer);
       }
 
     }
@@ -684,9 +718,22 @@ public class FifoScheduler extends
       LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
           + node.getAvailableResource());
     }
-    
-    metrics.setAvailableResourcesToQueue(
-        Resources.subtract(clusterResource, usedResource));
+
+    updateAvailableResourcesMetrics();
+  }
+
+  private void increaseUsedResources(RMContainer rmContainer) {
+    Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+  }
+
+  private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+    schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+      usedResource));
+  }
+
+  private void updateAvailableResourcesMetrics() {
+    metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+      usedResource));
   }
 
   @Override
@@ -696,6 +743,9 @@ public class FifoScheduler extends
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
+
     }
     break;
     case NODE_REMOVED:
@@ -900,4 +950,8 @@ public class FifoScheduler extends
       return null;
     }
   }
+
+  public Resource getUsedResource() {
+    return usedResource;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Mon Jun 16 18:13:57 2014
@@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringEscapeUtils;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte
     super(ctx);
     FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
     fsinfo = new FairSchedulerInfo(scheduler);
-    apps = rmContext.getRMApps();
+    apps = new ConcurrentHashMap<ApplicationId, RMApp>();
+    for (Map.Entry<ApplicationId, RMApp> entry : rmContext.getRMApps().entrySet()) {
+      if (!(RMAppState.NEW.equals(entry.getValue().getState())
+          || RMAppState.NEW_SAVING.equals(entry.getValue().getState())
+          || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) {
+        apps.put(entry.getKey(), entry.getValue());
+      }
+    }
     this.conf = conf;
   }
   

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Mon Jun 16 18:13:57 2014
@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -31,19 +34,27 @@ import java.util.concurrent.ConcurrentMa
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -56,6 +67,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -66,10 +79,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
@@ -82,7 +95,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
-import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -584,4 +596,166 @@ public class RMWebServices {
 
     return appAttemptsInfo;
   }
+
+  @GET
+  @Path("/apps/{appid}/state")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public AppState getAppState(@Context HttpServletRequest hsr,
+      @PathParam("appid") String appId) throws AuthorizationException {
+    init();
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    String userName = "";
+    if (callerUGI != null) {
+      userName = callerUGI.getUserName();
+    }
+    RMApp app = null;
+    try {
+      app = getRMAppForAppId(appId);
+    } catch (NotFoundException e) {
+      RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
+        "UNKNOWN", "RMWebService",
+        "Trying to get state of an absent application " + appId);
+      throw e;
+    }
+
+    AppState ret = new AppState();
+    ret.setState(app.getState().toString());
+
+    return ret;
+  }
+
+  // can't return POJO because we can't control the status code
+  // it's always set to 200 when we need to allow it to be set
+  // to 202
+
+  @PUT
+  @Path("/apps/{appid}/state")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response updateAppState(AppState targetState,
+      @Context HttpServletRequest hsr, @PathParam("appid") String appId)
+      throws AuthorizationException, YarnException, InterruptedException,
+      IOException {
+
+    init();
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    if (callerUGI == null) {
+      String msg = "Unable to obtain user name, user not authenticated";
+      throw new AuthorizationException(msg);
+    }
+
+    String userName = callerUGI.getUserName();
+    RMApp app = null;
+    try {
+      app = getRMAppForAppId(appId);
+    } catch (NotFoundException e) {
+      RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
+        "UNKNOWN", "RMWebService", "Trying to kill/move an absent application "
+            + appId);
+      throw e;
+    }
+
+    if (!app.getState().toString().equals(targetState.getState())) {
+      // user is attempting to change state. right we only
+      // allow users to kill the app
+
+      if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
+        return killApp(app, callerUGI, hsr);
+      }
+      throw new BadRequestException("Only '"
+          + YarnApplicationState.KILLED.toString()
+          + "' is allowed as a target state.");
+    }
+
+    AppState ret = new AppState();
+    ret.setState(app.getState().toString());
+
+    return Response.status(Status.OK).entity(ret).build();
+  }
+
+  protected Response killApp(RMApp app, UserGroupInformation callerUGI,
+      HttpServletRequest hsr) throws IOException, InterruptedException {
+
+    if (app == null) {
+      throw new IllegalArgumentException("app cannot be null");
+    }
+    String userName = callerUGI.getUserName();
+    final ApplicationId appid = app.getApplicationId();
+    KillApplicationResponse resp = null;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
+              @Override
+              public KillApplicationResponse run() throws IOException,
+                  YarnException {
+                KillApplicationRequest req =
+                    KillApplicationRequest.newInstance(appid);
+                return rm.getClientRMService().forceKillApplication(req);
+              }
+            });
+    } catch (UndeclaredThrowableException ue) {
+      // if the root cause is a permissions issue
+      // bubble that up to the user
+      if (ue.getCause() instanceof YarnException) {
+        YarnException ye = (YarnException) ue.getCause();
+        if (ye.getCause() instanceof AccessControlException) {
+          String appId = app.getApplicationId().toString();
+          String msg =
+              "Unauthorized attempt to kill appid " + appId
+                  + " by remote user " + userName;
+          return Response.status(Status.FORBIDDEN).entity(msg).build();
+        } else {
+          throw ue;
+        }
+      } else {
+        throw ue;
+      }
+    }
+
+    AppState ret = new AppState();
+    ret.setState(app.getState().toString());
+
+    if (resp.getIsKillCompleted()) {
+      RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
+        "RMWebService", app.getApplicationId());
+    } else {
+      return Response.status(Status.ACCEPTED).entity(ret)
+        .header(HttpHeaders.LOCATION, hsr.getRequestURL()).build();
+    }
+    return Response.status(Status.OK).entity(ret).build();
+  }
+
+  private RMApp getRMAppForAppId(String appId) {
+
+    if (appId == null || appId.isEmpty()) {
+      throw new NotFoundException("appId, " + appId + ", is empty or null");
+    }
+    ApplicationId id;
+    try {
+      id = ConverterUtils.toApplicationId(recordFactory, appId);
+    } catch (NumberFormatException e) {
+      throw new NotFoundException("appId is invalid");
+    }
+    if (id == null) {
+      throw new NotFoundException("appId is invalid");
+    }
+    RMApp app = rm.getRMContext().getRMApps().get(id);
+    if (app == null) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+    return app;
+  }
+
+  private UserGroupInformation getCallerUserGroupInformation(
+      HttpServletRequest hsr) {
+
+    String remoteUser = hsr.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+
+    return callerUGI;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Mon Jun 16 18:13:57 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -105,14 +104,14 @@ public class MockNM {
   }
 
   public RegisterNodeManagerResponse registerNode(
-      List<ContainerStatus> containerStatus) throws Exception{
+      List<NMContainerStatus> containerReports) throws Exception{
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
     Resource resource = BuilderUtils.newResource(memory, vCores);
     req.setResource(resource);
-    req.setContainerStatuses(containerStatus);
+    req.setContainerStatuses(containerReports);
     req.setNMVersion(version);
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
@@ -185,4 +184,11 @@ public class MockNM {
     return heartbeatResponse;
   }
 
+  public int getMemory() {
+    return memory;
+  }
+
+  public int getvCores() {
+    return vCores;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Mon Jun 16 18:13:57 2014
@@ -21,10 +21,9 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -69,6 +70,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -76,6 +81,7 @@ import org.apache.hadoop.yarn.util.Recor
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 
 @SuppressWarnings("unchecked")
 public class MockRM extends ResourceManager {
@@ -144,11 +150,26 @@ public class MockRM extends ResourceMana
     }
   }
 
+  public void waitForContainerToComplete(RMAppAttempt attempt,
+      NMContainerStatus completedContainer) throws InterruptedException {
+    while (true) {
+      List<ContainerStatus> containers = attempt.getJustFinishedContainers();
+      System.out.println("Received completed containers " + containers);
+      for (ContainerStatus container : containers) {
+        if (container.getContainerId().equals(
+          completedContainer.getContainerId())) {
+          return;
+        }
+      }
+      Thread.sleep(200);
+    }
+  }
+
   public void waitForState(MockNM nm, ContainerId containerId,
       RMContainerState containerState) throws Exception {
     RMContainer container = getResourceScheduler().getRMContainer(containerId);
     int timeoutSecs = 0;
-    while(container == null && timeoutSecs++ < 20) {
+    while(container == null && timeoutSecs++ < 100) {
       nm.nodeHeartbeat(true);
       container = getResourceScheduler().getRMContainer(containerId);
       System.out.println("Waiting for container " + containerId + " to be allocated.");
@@ -333,7 +354,7 @@ public class MockRM extends ResourceMana
   public void sendNodeStarted(MockNM nm) throws Exception {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());
-    node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
   }
   
   public void sendNodeLost(MockNM nm) throws Exception {
@@ -542,4 +563,16 @@ public class MockRM extends ResourceMana
             .newInstance(appId));
     return response.getApplicationReport();
   }
+
+  // Explicitly reset queue metrics for testing.
+  @SuppressWarnings("static-access")
+  public void clearQueueMetrics(RMApp app) {
+    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) getResourceScheduler())
+      .getSchedulerApplications().get(app.getApplicationId()).getQueue()
+      .getMetrics().clearQueueMetrics();
+  }
+  
+  public RMActiveServices getRMActiveService() {
+    return activeServices;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Mon Jun 16 18:13:57 2014
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -259,6 +260,28 @@ public class TestClientRMService {
   }
 
   @Test
+  public void testGetApplicationResourceUsageReportDummy() throws YarnException,
+      IOException {
+    ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+    YarnScheduler yarnScheduler = mockYarnScheduler();
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+        new EventHandler<Event>() {
+          public void handle(Event event) {
+          }
+        });
+    ApplicationSubmissionContext asContext = 
+        mock(ApplicationSubmissionContext.class);
+    YarnConfiguration config = new YarnConfiguration();
+    RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
+        rmContext, yarnScheduler, null, asContext, config, false);
+    ApplicationResourceUsageReport report = rmAppAttemptImpl
+        .getApplicationResourceUsageReport();
+    assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
+  }
+
+  @Test
   public void testGetApplicationAttempts() throws YarnException, IOException {
     ClientRMService rmService = createRMService();
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -646,7 +669,8 @@ public class TestClientRMService {
     ApplicationId[] appIds =
         {getApplicationId(101), getApplicationId(102), getApplicationId(103)};
     List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-
+    
+    long[] submitTimeMillis = new long[3];
     // Submit applications
     for (int i = 0; i < appIds.length; i++) {
       ApplicationId appId = appIds[i];
@@ -656,6 +680,7 @@ public class TestClientRMService {
           appId, appNames[i], queues[i % queues.length],
           new HashSet<String>(tags.subList(0, i + 1)));
       rmService.submitApplication(submitRequest);
+      submitTimeMillis[i] = System.currentTimeMillis();
     }
 
     // Test different cases of ClientRMService#getApplications()
@@ -667,6 +692,24 @@ public class TestClientRMService {
     request.setLimit(1L);
     assertEquals("Failed to limit applications", 1,
         rmService.getApplications(request).getApplicationList().size());
+    
+    // Check start range
+    request = GetApplicationsRequest.newInstance();
+    request.setStartRange(submitTimeMillis[0], System.currentTimeMillis());
+    
+    // 2 applications are submitted after first timeMills
+    assertEquals("Incorrect number of matching start range", 
+        2, rmService.getApplications(request).getApplicationList().size());
+    
+    // 1 application is submitted after the second timeMills
+    request.setStartRange(submitTimeMillis[1], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        1, rmService.getApplications(request).getApplicationList().size());
+    
+    // no application is submitted after the third timeMills
+    request.setStartRange(submitTimeMillis[2], System.currentTimeMillis());
+    assertEquals("Incorrect number of matching start range", 
+        0, rmService.getApplications(request).getApplicationList().size());
 
     // Check queue
     request = GetApplicationsRequest.newInstance();
@@ -944,6 +987,8 @@ public class TestClientRMService {
         Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
     when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
         Arrays.asList(getApplicationAttemptId(103)));
+    ApplicationAttemptId attemptId = getApplicationAttemptId(1);
+    when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     return yarnScheduler;
   }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class TestFifoScheduler {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -77,12 +77,12 @@ public class TestFifoScheduler {
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FifoScheduler();
+    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -218,6 +218,9 @@ public class TestFifoScheduler {
   public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
     FifoScheduler scheduler = new FifoScheduler();
     MockRM rm = new MockRM(conf);
+    scheduler.setRMContext(rm.getRMContext());
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, rm.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1,
@@ -293,7 +296,12 @@ public class TestFifoScheduler {
     conf.setQueues("default", new String[] {"default"});
     conf.setCapacity("default", 100);
     FifoScheduler fs = new FifoScheduler();
+    fs.init(conf);
+    fs.start();
+    // mock rmContext to avoid NPE.
+    RMContext context = mock(RMContext.class);
     fs.reinitialize(conf, null);
+    fs.setRMContext(context);
 
     RMNode n1 =
         MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");
@@ -313,6 +321,7 @@ public class TestFifoScheduler {
     fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+    fs.stop();
   }
   
   @Test (timeout = 50000)

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Mon Jun 16 18:13:57 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
 public class TestMoveApplication {
   private ResourceManager resourceManager = null;
   private static boolean failMove;
-  
+  private Configuration conf;
+
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
         FifoSchedulerWithMove.class);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
     }
   }
   
-  @Test (timeout = 5000)
-  public void testMoveSuccessful() throws Exception {
-    // Submit application
-    Application application = new Application("user1", resourceManager);
-    ApplicationId appId = application.getApplicationId();
-    application.submit();
-    
-    // Wait for app to be accepted
-    RMApp app = resourceManager.rmContext.getRMApps().get(appId);
-    while (app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(100);
-    }
-
-    ClientRMService clientRMService = resourceManager.getClientRMService();
+  @Test (timeout = 10000)
+      public
+      void testMoveSuccessful() throws Exception {
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app = rm1.submitApp(1024);
+    ClientRMService clientRMService = rm1.getClientRMService();
     // FIFO scheduler does not support moves
-    clientRMService.moveApplicationAcrossQueues(
-        MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-    
-    RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+    clientRMService
+      .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+        .newInstance(app.getApplicationId(), "newqueue"));
+
+    RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
     assertEquals("newqueue", rmApp.getQueue());
+    rm1.stop();
   }
-  
+
   @Test
   public void testMoveRejectedByPermissions() throws Exception {
     failMove = true;

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Jun 16 18:13:57 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
-    node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
-    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node2.handle(new RMNodeStartedEvent(null, null));
     
     ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testStatusChange(){
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     //Add info to the queue first
     node.setNextHeartBeat(false);
 
@@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
         null, ResourceOption.newInstance(capability,
             RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
   }
@@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jun 16 18:13:57 2014
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,13 +68,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -82,8 +84,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -303,13 +305,11 @@ public class TestRMRestart {
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
     nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
 
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     nm2.registerNode();
     
     rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -510,14 +510,11 @@ public class TestRMRestart {
     Assert.assertEquals(RMAppAttemptState.LAUNCHED,
         rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
             .getAppAttemptState());
-    
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(
-            BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart.createNMContainerStatus(
+          am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     launchAM(rmApp, rm2, nm1);
     Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -615,7 +612,7 @@ public class TestRMRestart {
 
       @Override
       public void updateApplicationStateInternal(ApplicationId appId,
-          ApplicationStateDataPBImpl appStateData) throws Exception {
+          ApplicationStateData appStateData) throws Exception {
         if (count == 0) {
           // do nothing; simulate app final state is not saved.
           LOG.info(appId + " final state is not saved.");
@@ -763,14 +760,14 @@ public class TestRMRestart {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
     };
@@ -1678,13 +1675,12 @@ public class TestRMRestart {
     am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
+
+    NMContainerStatus status =
+        TestRMRestart
+          .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(status));
     while (loadedApp1.getAppAttempts().size() != 2) {
       Thread.sleep(200);
     }
@@ -1808,12 +1804,10 @@ public class TestRMRestart {
             // ResourceTrackerService is started.
             super.serviceStart();
             nm1.setResourceTrackerService(getResourceTrackerService());
-            List<ContainerStatus> status = new ArrayList<ContainerStatus>();
-            ContainerId amContainer =
-                ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
-            status.add(ContainerStatus.newInstance(amContainer,
-              ContainerState.COMPLETE, "AM container exit", 143));
-            nm1.registerNode(status);
+            NMContainerStatus status =
+                TestRMRestart.createNMContainerStatus(
+                  am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+            nm1.registerNode(Arrays.asList(status));
           }
         };
       }
@@ -1852,6 +1846,15 @@ public class TestRMRestart {
     }
   }
 
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+          Resource.newInstance(1024, 1), "recover container", 0);
+    return containerReport;
+  }
+
   public class TestMemoryRMStateStore extends MemoryRMStateStore {
     int count = 0;
     public int updateApp = 0;
@@ -1859,7 +1862,7 @@ public class TestRMRestart {
 
     @Override
     public void updateApplicationStateInternal(ApplicationId appId,
-        ApplicationStateDataPBImpl appStateData) throws Exception {
+        ApplicationStateData appStateData) throws Exception {
       updateApp = ++count;
       super.updateApplicationStateInternal(appId, appStateData);
     }
@@ -1868,7 +1871,7 @@ public class TestRMRestart {
     public synchronized void
         updateApplicationAttemptStateInternal(
             ApplicationAttemptId attemptId,
-            ApplicationAttemptStateDataPBImpl attemptStateData)
+            ApplicationAttemptStateData attemptStateData)
             throws Exception {
       updateAttempt = ++count;
       super.updateApplicationAttemptStateInternal(attemptId,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Mon Jun 16 18:13:57 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -487,33 +488,37 @@ public class TestResourceTrackerService 
     RMApp app = rm.submitApp(1024, true);
 
     // Case 1.1: AppAttemptId is null
-    ContainerStatus status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    NMContainerStatus report =
+        NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event) any());
 
     // Case 1.2: Master container is null
     RMAppAttemptImpl currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
-    rm.getResourceTrackerService().handleContainerStatus(status);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleNMContainerStatus(report);
     verify(handler, never()).handle((Event)any());
 
     // Case 2: Managed AM
     app = rm.submitApp(1024);
 
     // Case 2.1: AppAttemptId is null
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(
-            app.getApplicationId(), 2), 1),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+          ContainerId.newInstance(
+            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
+          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+          "Dummy Completed", 0);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }
@@ -523,11 +528,12 @@ public class TestResourceTrackerService 
     currentAttempt =
         (RMAppAttemptImpl) app.getCurrentAppAttempt();
     currentAttempt.setMasterContainer(null);
-    status = ContainerStatus.newInstance(
-        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
-        ContainerState.COMPLETE, "Dummy Completed", 0);
+    report = NMContainerStatus.newInstance(
+      ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+      ContainerState.COMPLETE, Resource.newInstance(1024, 1),
+      "Dummy Completed", 0);
     try {
-      rm.getResourceTrackerService().handleContainerStatus(status);
+      rm.getResourceTrackerService().handleNMContainerStatus(report);
     } catch (Exception e) {
       // expected - ignore
     }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Mon Jun 16 18:13:57 2014
@@ -17,6 +17,25 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
@@ -27,12 +46,16 @@ import java.util.Random;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -52,17 +75,6 @@ import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 public class TestProportionalCapacityPreemptionPolicy {
 
   static final long TS = 3141592653L;
@@ -424,6 +436,36 @@ public class TestProportionalCapacityPre
     assert containers.get(4).equals(rm5);
 
   }
+  
+  @Test
+  public void testPolicyInitializeAfterSchedulerInitialized() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    
+    @SuppressWarnings("resource")
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    
+    // ProportionalCapacityPreemptionPolicy should be initialized after
+    // CapacityScheduler initialized. We will 
+    // 1) find SchedulingMonitor from RMActiveService's service list, 
+    // 2) check if ResourceCalculator in policy is null or not. 
+    // If it's not null, we can come to a conclusion that policy initialized
+    // after scheduler got initialized
+    for (Service service : rm.getRMActiveService().getServices()) {
+      if (service instanceof SchedulingMonitor) {
+        ProportionalCapacityPreemptionPolicy policy =
+            (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
+                .getSchedulingEditPolicy();
+        assertNotNull(policy.getResourceCalculator());
+        return;
+      }
+    }
+    
+    fail("Failed to find SchedulingMonitor service, please check what happened");
+  }
 
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {