You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC

svn commit: r1602947 [3/5] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Mon Jun 16 18:13:57 2014
@@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
-public class ApplicationStateDataPBImpl 
-extends ProtoBase<ApplicationStateDataProto> 
-implements ApplicationStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationStateDataPBImpl extends ApplicationStateData {
   ApplicationStateDataProto proto = 
             ApplicationStateDataProto.getDefaultInstance();
   ApplicationStateDataProto.Builder builder = null;
@@ -51,7 +45,8 @@ implements ApplicationStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -136,7 +131,7 @@ implements ApplicationStateData {
     }
     applicationSubmissionContext = 
         new ApplicationSubmissionContextPBImpl(
-                                          p.getApplicationSubmissionContext());
+            p.getApplicationSubmissionContext());
     return applicationSubmissionContext;
   }
 
@@ -200,21 +195,24 @@ implements ApplicationStateData {
     builder.setFinishTime(finishTime);
   }
 
-  public static ApplicationStateData newApplicationStateData(long submitTime,
-      long startTime, String user,
-      ApplicationSubmissionContext submissionContext, RMAppState state,
-      String diagnostics, long finishTime) {
-
-    ApplicationStateData appState =
-        recordFactory.newRecordInstance(ApplicationStateData.class);
-    appState.setSubmitTime(submitTime);
-    appState.setStartTime(startTime);
-    appState.setUser(user);
-    appState.setApplicationSubmissionContext(submissionContext);
-    appState.setState(state);
-    appState.setDiagnostics(diagnostics);
-    appState.setFinishTime(finishTime);
-    return appState;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
   }
 
   private static String RM_APP_PREFIX = "RMAPP_";

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Jun 16 18:13:57 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -293,11 +294,6 @@ public class RMAppImpl implements RMApp,
   private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
                                                                  stateMachine;
 
-  private static final ApplicationResourceUsageReport
-    DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
-      BuilderUtils.newApplicationResourceUsageReport(-1, -1,
-          Resources.createResource(-1, -1), Resources.createResource(-1, -1),
-          Resources.createResource(-1, -1));
   private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
   
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
@@ -498,7 +494,7 @@ public class RMAppImpl implements RMApp,
       String origTrackingUrl = UNAVAILABLE;
       int rpcPort = -1;
       ApplicationResourceUsageReport appUsageReport =
-          DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+          RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
       FinalApplicationStatus finishState = getFinalApplicationStatus();
       String diags = UNAVAILABLE;
       float progress = 0.0f;
@@ -723,29 +719,36 @@ public class RMAppImpl implements RMApp,
     }
   }
 
+  // synchronously recover attempt to ensure any incoming external events
+  // to be processed after the attempt processes the recover event.
+  private void recoverAppAttempts() {
+    for (RMAppAttempt attempt : getAppAttempts().values()) {
+      attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+        RMAppAttemptEventType.RECOVER));
+    }
+  }
+
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      for (RMAppAttempt attempt : app.getAppAttempts().values()) {
-        // synchronously recover attempt to ensure any incoming external events
-        // to be processed after the attempt processes the recover event.
-        attempt.handle(
-          new RMAppAttemptEvent(attempt.getAppAttemptId(),
-            RMAppAttemptEventType.RECOVER));
-      }
-
       // The app has completed.
       if (app.recoveredFinalState != null) {
+        app.recoverAppAttempts();
         new FinalTransition(app.recoveredFinalState).transition(app, event);
         return app.recoveredFinalState;
       }
 
-      // Last attempt is in final state, do not add to scheduler and just return
-      // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
-      // back.
+      // Notify scheduler about the app on recovery
+      new AddApplicationToSchedulerTransition().transition(app, event);
+
+      // recover attempts
+      app.recoverAppAttempts();
+
+      // Last attempt is in final state, return ACCEPTED waiting for last
+      // RMAppAttempt to send finished or failed event back.
       if (app.currentAttempt != null
           && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
               || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
@@ -754,9 +757,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.ACCEPTED;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
-
       // No existent attempts means the attempt associated with this app was not
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
@@ -1055,8 +1055,12 @@ public class RMAppImpl implements RMApp,
       if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
-      app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
-        finalState));
+      // Recovered apps that are completed were not added to scheduler, so no
+      // need to remove them from scheduler.
+      if (app.recoveredFinalState == null) {
+        app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+          finalState));
+      }
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jun 16 18:13:57 2014
@@ -267,15 +267,17 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
           new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+            new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+      .addTransition(RMAppAttemptState.LAUNCHED,
+          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+          new ContainerFinishedTransition(
+            new AMContainerCrashedBeforeRunningTransition(),
+            RMAppAttemptState.LAUNCHED))
       .addTransition(
           RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -302,7 +304,9 @@ public class RMAppAttemptImpl implements
           RMAppAttemptState.RUNNING,
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new ContainerFinishedTransition())
+          new ContainerFinishedTransition(
+            new AMContainerCrashedAtRunningTransition(),
+            RMAppAttemptState.RUNNING))
       .addTransition(
           RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -671,9 +675,7 @@ public class RMAppAttemptImpl implements
       ApplicationResourceUsageReport report =
           scheduler.getAppResourceUsageReport(this.getAppAttemptId());
       if (report == null) {
-        Resource none = Resource.newInstance(0, 0);
-        report = ApplicationResourceUsageReport.newInstance(0, 0, none, none,
-            none);
+        report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
       }
       return report;
     } finally {
@@ -904,6 +906,12 @@ public class RMAppAttemptImpl implements
         }
         return appAttempt.recoveredFinalState;
       } else {
+        // Add the current attempt to the scheduler.
+        if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false));
+        }
+
         /*
          * Since the application attempt's final state is not saved that means
          * for AM container (previous attempt) state must be one of these.
@@ -1207,17 +1215,16 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final class AMContainerCrashedTransition extends
+  private static final class AMContainerCrashedBeforeRunningTransition extends
       BaseFinalTransition {
 
-    public AMContainerCrashedTransition() {
+    public AMContainerCrashedBeforeRunningTransition() {
       super(RMAppAttemptState.FAILED);
     }
 
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
       RMAppAttemptContainerFinishedEvent finishEvent =
           ((RMAppAttemptContainerFinishedEvent)event);
 
@@ -1410,6 +1417,16 @@ public class RMAppAttemptImpl implements
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
 
+    // The transition To Do after attempt final state is saved.
+    private BaseTransition transitionToDo;
+    private RMAppAttemptState currentState;
+
+    public ContainerFinishedTransition(BaseTransition transitionToDo,
+        RMAppAttemptState currentState) {
+      this.transitionToDo = transitionToDo;
+      this.currentState = currentState;
+    }
+
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
@@ -1426,14 +1443,13 @@ public class RMAppAttemptImpl implements
             containerStatus.getContainerId())) {
         // Remember the follow up transition and save the final attempt state.
         appAttempt.rememberTargetTransitionsAndStoreState(event,
-          new ContainerFinishedFinalStateSavedTransition(),
-          RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+          transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
         return RMAppAttemptState.FINAL_SAVING;
       }
 
       // Normal container.Put it in completedcontainers list
       appAttempt.justFinishedContainers.add(containerStatus);
-      return RMAppAttemptState.RUNNING;
+      return this.currentState;
     }
   }
 
@@ -1451,7 +1467,7 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static class ContainerFinishedFinalStateSavedTransition extends
+  private static class AMContainerCrashedAtRunningTransition extends
       BaseTransition {
     @Override
     public void

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Mon Jun 16 18:13:57 2014
@@ -33,5 +33,7 @@ public enum RMContainerEventType {
   RELEASED,
 
   // Source: ContainerAllocationExpirer  
-  EXPIRE
+  EXPIRE,
+
+  RECOVER
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Mon Jun 16 18:13:57 2014
@@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +67,9 @@ public class RMContainerImpl implements 
         RMContainerEventType.KILL)
     .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.NEW,
+        EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+        RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
 
     // Transitions from RESERVED state
     .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
@@ -341,6 +346,38 @@ public class RMContainerImpl implements 
     }
   }
 
+  private static final class ContainerRecoveredTransition
+      implements
+      MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
+    @Override
+    public RMContainerState transition(RMContainerImpl container,
+        RMContainerEvent event) {
+      NMContainerStatus report =
+          ((RMContainerRecoverEvent) event).getContainerReport();
+      if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+        ContainerStatus status =
+            ContainerStatus.newInstance(report.getContainerId(),
+              report.getContainerState(), report.getDiagnostics(),
+              report.getContainerExitStatus());
+
+        new FinishedTransition().transition(container,
+          new RMContainerFinishedEvent(container.containerId, status,
+            RMContainerEventType.FINISHED));
+        return RMContainerState.COMPLETED;
+      } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+        // Tell the appAttempt
+        container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
+            container.getApplicationAttemptId(), container.getContainer()));
+        return RMContainerState.RUNNING;
+      } else {
+        // This can never happen.
+        LOG.warn("RMContainer received unexpected recover event with container"
+            + " state " + report.getContainerState() + " while recovering.");
+        return RMContainerState.RUNNING;
+      }
+    }
+  }
+
   private static final class ContainerReservedTransition extends
   BaseTransition {
 
@@ -398,7 +435,6 @@ public class RMContainerImpl implements 
       // Inform AppAttempt
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
           container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
-
       container.rmContext.getRMApplicationHistoryWriter()
           .containerFinished(container);
     }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Jun 16 18:13:57 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
@@ -460,13 +461,9 @@ public class RMNodeImpl implements RMNod
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
+      RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
+      List<NMContainerStatus> containers = null;
 
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeAddedSchedulerEvent(rmNode));
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodesListManagerEvent(
-              NodesListManagerEventType.NODE_USABLE, rmNode));
- 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -476,10 +473,17 @@ public class RMNodeImpl implements RMNod
       } else {
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
+        containers = startEvent.getContainerRecoveryReports();
       }
+
+      rmNode.context.getDispatcher().getEventHandler()
+        .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+      rmNode.context.getDispatcher().getEventHandler().handle(
+        new NodesListManagerEvent(
+            NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
-  
+
   public static class ReconnectNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -513,7 +517,7 @@ public class RMNodeImpl implements RMNod
         }
         rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
         rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+            new RMNodeStartedEvent(newNode.getNodeID(), null));
       }
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Mon Jun 16 18:13:57 2014
@@ -26,28 +26,35 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+@SuppressWarnings("unchecked")
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
-    implements ResourceScheduler {
+    extends AbstractService implements ResourceScheduler {
 
   private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
 
   // Nodes in the cluster, indexed by NodeId
-  protected Map<NodeId, N> nodes =
-      new ConcurrentHashMap<NodeId, N>();
+  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
 
   // Whole capacity of the cluster
   protected Resource clusterResource = Resource.newInstance(0, 0);
@@ -57,11 +64,21 @@ public abstract class AbstractYarnSchedu
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication<T>> applications;
+
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
     EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractYarnScheduler(String name) {
+    super(name);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
@@ -159,4 +176,90 @@ public abstract class AbstractYarnSchedu
     throw new YarnException(getClass().getSimpleName()
         + " does not support moving apps between queues");
   }
+
+  private void killOrphanContainerOnNode(RMNode node,
+      NMContainerStatus container) {
+    if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
+      this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMNodeCleanContainerEvent(node.getNodeID(),
+          container.getContainerId()));
+    }
+  }
+
+  public synchronized void recoverContainersOnNode(
+      List<NMContainerStatus> containerReports, RMNode nm) {
+    if (!rmContext.isWorkPreservingRecoveryEnabled()
+        || containerReports == null
+        || (containerReports != null && containerReports.isEmpty())) {
+      return;
+    }
+
+    for (NMContainerStatus container : containerReports) {
+      ApplicationId appId =
+          container.getContainerId().getApplicationAttemptId().getApplicationId();
+      RMApp rmApp = rmContext.getRMApps().get(appId);
+      if (rmApp == null) {
+        LOG.error("Skip recovering container " + container
+            + " for unknown application.");
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      // Unmanaged AM recovery is addressed in YARN-1815
+      if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+        LOG.info("Skip recovering container " + container + " for unmanaged AM."
+            + rmApp.getApplicationId());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      SchedulerApplication<T> schedulerApp = applications.get(appId);
+      if (schedulerApp == null) {
+        LOG.info("Skip recovering container  " + container
+            + " for unknown SchedulerApplication. Application current state is "
+            + rmApp.getState());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      LOG.info("Recovering container " + container);
+      SchedulerApplicationAttempt schedulerAttempt =
+          schedulerApp.getCurrentAppAttempt();
+
+      // create container
+      RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+      // recover RMContainer
+      rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
+        container));
+
+      // recover scheduler node
+      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+      // recover queue: update headroom etc.
+      Queue queue = schedulerAttempt.getQueue();
+      queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+      // recover scheduler attempt
+      schedulerAttempt.recoverContainer(rmContainer);
+    }
+  }
+
+  private RMContainer recoverAndCreateContainer(NMContainerStatus report,
+      RMNode node) {
+    Container container =
+        Container.newInstance(report.getContainerId(), node.getNodeID(),
+          node.getHttpAddress(), report.getAllocatedResource(),
+          report.getPriority(), null);
+    ApplicationAttemptId attemptId =
+        container.getId().getApplicationAttemptId();
+    RMContainer rmContainer =
+        new RMContainerImpl(container, attemptId, node.getNodeID(),
+          applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+    return rmContainer;
+  }
+
+  public SchedulerNode getSchedulerNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Mon Jun 16 18:13:57 2014
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -409,4 +411,25 @@ public class AppSchedulingInfo {
     //    this.requests = appInfo.getRequests();
     this.blacklist = appInfo.getBlackList();
   }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    // ContainerIdCounter on recovery will be addressed in YARN-2052
+    this.containerIdCounter.incrementAndGet();
+
+    QueueMetrics metrics = queue.getMetrics();
+    if (pending) {
+      // If there was any container to recover, the application was
+      // running from scheduler's POV.
+      pending = false;
+      metrics.runAppAttempt(applicationId, user);
+    }
+
+    // Container is completed. Skip recovering resources.
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+
+    metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+      false);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Mon Jun 16 18:13:57 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
 @Evolving
 @LimitedPrivate("yarn")
@@ -60,4 +62,13 @@ public interface Queue {
   boolean hasAccess(QueueACL acl, UserGroupInformation user);
   
   public ActiveUsersManager getActiveUsersManager();
+
+  /**
+   * Recover the state of the queue for a given container.
+   * @param clusterResource the resource of the cluster
+   * @param schedulerAttempt the application for which the container was allocated
+   * @param rmContainer the container that was recovered.
+   */
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Mon Jun 16 18:13:57 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
 @LimitedPrivate("yarn")
 @Evolving
 public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+  /**
+   * Set RMContext for <code>ResourceScheduler</code>.
+   * This method should be called immediately after instantiating
+   * a scheduler once.
+   * @param rmContext created by ResourceManager
+   */
+  void setRMContext(RMContext rmContext);
+
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Mon Jun 16 18:13:57 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -76,6 +78,9 @@ public class SchedulerApplicationAttempt
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
   protected Resource currentConsumption = Resource.newInstance(0, 0);
+  private Resource amResource;
+  private boolean unmanagedAM = true;
+  private boolean amRunning = false;
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -106,6 +111,17 @@ public class SchedulerApplicationAttempt
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager);
     this.queue = queue;
+
+    if (rmContext != null && rmContext.getRMApps() != null &&
+        rmContext.getRMApps()
+            .containsKey(applicationAttemptId.getApplicationId())) {
+      ApplicationSubmissionContext appSubmissionContext =
+          rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+              .getApplicationSubmissionContext();
+      if (appSubmissionContext != null) {
+        unmanagedAM = appSubmissionContext.getUnmanagedAM();
+      }
+    }
   }
   
   /**
@@ -168,6 +184,26 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getQueueName();
   }
   
+  public Resource getAMResource() {
+    return amResource;
+  }
+
+  public void setAMResource(Resource amResource) {
+    this.amResource = amResource;
+  }
+
+  public boolean isAmRunning() {
+    return amRunning;
+  }
+
+  public void setAmRunning(boolean bool) {
+    amRunning = bool;
+  }
+
+  public boolean getUnmanagedAM() {
+    return unmanagedAM;
+  }
+
   public synchronized RMContainer getRMContainer(ContainerId id) {
     return liveContainers.get(id);
   }
@@ -499,5 +535,24 @@ public class SchedulerApplicationAttempt
 
     appSchedulingInfo.move(newQueue);
     this.queue = newQueue;
-  }  
+  }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    // recover app scheduling info
+    appSchedulingInfo.recoverContainer(rmContainer);
+
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+      + " is recovering container " + rmContainer.getContainerId());
+    liveContainers.put(rmContainer.getContainerId(), rmContainer);
+    Resources.addTo(currentConsumption, rmContainer.getContainer()
+      .getResource());
+    // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+    // is called.
+    // newlyAllocatedContainers.add(rmContainer);
+    // schedulingOpportunities
+    // lastScheduledContainer
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Mon Jun 16 18:13:57 2014
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -119,13 +118,10 @@ public abstract class SchedulerNode {
    * The Scheduler has allocated containers on this node to the given
    * application.
    * 
-   * @param applicationId
-   *          application
    * @param rmContainer
    *          allocated container
    */
-  public synchronized void allocateContainer(ApplicationId applicationId,
-      RMContainer rmContainer) {
+  public synchronized void allocateContainer(RMContainer rmContainer) {
     Container container = rmContainer.getContainer();
     deductAvailableResource(container.getResource());
     ++numContainers;
@@ -166,8 +162,8 @@ public abstract class SchedulerNode {
     return this.totalResourceCapability;
   }
 
-  private synchronized boolean isValidContainer(Container c) {
-    if (launchedContainers.containsKey(c.getId())) {
+  public synchronized boolean isValidContainer(ContainerId containerId) {
+    if (launchedContainers.containsKey(containerId)) {
       return true;
     }
     return false;
@@ -185,7 +181,7 @@ public abstract class SchedulerNode {
    *          container to be released
    */
   public synchronized void releaseContainer(Container container) {
-    if (!isValidContainer(container)) {
+    if (!isValidContainer(container.getId())) {
       LOG.error("Invalid container released " + container);
       return;
     }
@@ -274,4 +270,12 @@ public abstract class SchedulerNode {
     // we can only adjust available resource if total resource is changed.
     Resources.addTo(this.availableResource, deltaResource);
   }
+
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    allocateContainer(rmContainer);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Mon Jun 16 18:13:57 2014
@@ -28,7 +28,6 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
@@ -235,15 +234,6 @@ extends org.apache.hadoop.yarn.server.re
   public ActiveUsersManager getActiveUsersManager();
   
   /**
-   * Recover the state of the queue
-   * @param clusterResource the resource of the cluster
-   * @param application the application for which the container was allocated
-   * @param container the container that was recovered.
-   */
-  public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, 
-      Container container);
-  
-  /**
    * Adds all applications in the queue and its subqueues to the given collection.
    * @param apps the collection to add the applications to
    */

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -102,6 +103,8 @@ public class CapacityScheduler extends
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
   private CSQueue root;
+  // timeout to join when we stop this service
+  protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
   static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
     @Override
@@ -179,8 +182,6 @@ public class CapacityScheduler extends
 
   private int numNodeManagers = 0;
 
-  private boolean initialized = false;
-
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
@@ -196,7 +197,9 @@ public class CapacityScheduler extends
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
-  public CapacityScheduler() {}
+  public CapacityScheduler() {
+    super(CapacityScheduler.class.getName());
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -238,56 +241,91 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public RMContext getRMContext() {
+  public synchronized RMContext getRMContext() {
     return this.rmContext;
   }
-  
+
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
+
+  private synchronized void initScheduler(Configuration configuration) throws
+      IOException {
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    this.minimumAllocation = this.conf.getMinimumAllocation();
+    this.maximumAllocation = this.conf.getMaximumAllocation();
+    this.calculator = this.conf.getResourceCalculator();
+    this.usePortForNodeName = this.conf.getUsePortForNodeName();
+    this.applications =
+        new ConcurrentHashMap<ApplicationId,
+            SchedulerApplication<FiCaSchedulerApp>>();
+
+    initializeQueues(this.conf);
+
+    scheduleAsynchronously = this.conf.getScheduleAynschronously();
+    asyncScheduleInterval =
+        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+            DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+    if (scheduleAsynchronously) {
+      asyncSchedulerThread = new AsyncScheduleThread(this);
+    }
+
+    LOG.info("Initialized CapacityScheduler with " +
+        "calculator=" + getResourceCalculator().getClass() + ", " +
+        "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+        "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+        "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+        "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+  }
+
+  private synchronized void startSchedulerThreads() {
+    if (scheduleAsynchronously) {
+      Preconditions.checkNotNull(asyncSchedulerThread,
+          "asyncSchedulerThread is null");
+      asyncSchedulerThread.start();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
-    if (!initialized) {
-      this.rmContext = rmContext;
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      this.minimumAllocation = this.conf.getMinimumAllocation();
-      this.maximumAllocation = this.conf.getMaximumAllocation();
-      this.calculator = this.conf.getResourceCalculator();
-      this.usePortForNodeName = this.conf.getUsePortForNodeName();
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    initScheduler(configuration);
+    super.serviceInit(conf);
+  }
 
-      initializeQueues(this.conf);
-      
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = 
-          this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, 
-              DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-      if (scheduleAsynchronously) {
-        asyncSchedulerThread = new AsyncScheduleThread(this);
-        asyncSchedulerThread.start();
-      }
-      
-      initialized = true;
-      LOG.info("Initialized CapacityScheduler with " +
-          "calculator=" + getResourceCalculator().getClass() + ", " +
-          "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
-          "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
-          "asynchronousScheduling=" + scheduleAsynchronously + ", " +
-          "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-      
-    } else {
-      CapacitySchedulerConfiguration oldConf = this.conf; 
-      this.conf = loadCapacitySchedulerConfiguration(configuration);
-      validateConf(this.conf);
-      try {
-        LOG.info("Re-initializing queues...");
-        reinitializeQueues(this.conf);
-      } catch (Throwable t) {
-        this.conf = oldConf;
-        throw new IOException("Failed to re-init queues", t);
+  @Override
+  public void serviceStart() throws Exception {
+    startSchedulerThreads();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized (this) {
+      if (scheduleAsynchronously && asyncSchedulerThread != null) {
+        asyncSchedulerThread.interrupt();
+        asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
       }
     }
+    super.serviceStop();
+  }
+
+  @Override
+  public synchronized void
+  reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+    Configuration configuration = new Configuration(conf);
+    CapacitySchedulerConfiguration oldConf = this.conf;
+    this.conf = loadCapacitySchedulerConfiguration(configuration);
+    validateConf(this.conf);
+    try {
+      LOG.info("Re-initializing queues...");
+      reinitializeQueues(this.conf);
+    } catch (Throwable t) {
+      this.conf = oldConf;
+      throw new IOException("Failed to re-init queues", t);
+    }
   }
   
   long getAsyncScheduleInterval() {
@@ -834,6 +872,8 @@ public class CapacityScheduler extends
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
     }
     break;
     case NODE_REMOVED:

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Jun 16 18:13:57 2014
@@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @Private
 @Unstable
 public class LeafQueue implements CSQueue {
@@ -564,7 +566,8 @@ public class LeafQueue implements CSQueu
         "numContainers=" + getNumContainers();  
   }
 
-  private synchronized User getUser(String userName) {
+  @VisibleForTesting
+  public synchronized User getUser(String userName) {
     User user = users.get(userName);
     if (user == null) {
       user = new User();
@@ -1346,8 +1349,7 @@ public class LeafQueue implements CSQueu
       }
 
       // Inform the node
-      node.allocateContainer(application.getApplicationId(), 
-          allocatedContainer);
+      node.allocateContainer(allocatedContainer);
 
       LOG.info("assignedContainer" +
           " application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1448,7 @@ public class LeafQueue implements CSQueu
   }
 
   synchronized void allocateResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource) {
+      SchedulerApplicationAttempt application, Resource resource) {
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1532,8 @@ public class LeafQueue implements CSQueu
     return metrics;
   }
 
-  static class User {
+  @VisibleForTesting
+  public static class User {
     Resource consumed = Resources.createResource(0, 0);
     int pendingApplications = 0;
     int activeApplications = 0;
@@ -1580,13 +1583,16 @@ public class LeafQueue implements CSQueu
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, application, container.getResource());
+      allocateResource(clusterResource, attempt, rmContainer.getContainer()
+        .getResource());
     }
-    getParent().recoverContainer(clusterResource, application, container);
-
+    getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
 
   /**
@@ -1613,5 +1619,4 @@ public class LeafQueue implements CSQueu
       apps.add(app.getApplicationAttemptId());
     }
   }
-
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Mon Jun 16 18:13:57 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -770,13 +771,16 @@ public class ParentQueue implements CSQu
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, container.getResource());
+      allocateResource(clusterResource,rmContainer.getContainer().getResource());
     }
     if (parent != null) {
-      parent.recoverContainer(clusterResource, application, container);
+      parent.recoverContainer(clusterResource, attempt, rmContainer);
     }
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Mon Jun 16 18:13:57 2014
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Mon Jun 16 18:13:57 2014
@@ -18,19 +18,34 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class NodeAddedSchedulerEvent extends SchedulerEvent {
 
   private final RMNode rmNode;
+  private final List<NMContainerStatus> containerReports;
 
   public NodeAddedSchedulerEvent(RMNode rmNode) {
     super(SchedulerEventType.NODE_ADDED);
     this.rmNode = rmNode;
+    this.containerReports = null;
+  }
+
+  public NodeAddedSchedulerEvent(RMNode rmNode,
+      List<NMContainerStatus> containerReports) {
+    super(SchedulerEventType.NODE_ADDED);
+    this.rmNode = rmNode;
+    this.containerReports = containerReports;
   }
 
   public RMNode getAddedRMNode() {
     return rmNode;
   }
 
+  public List<NMContainerStatus> getContainerReports() {
+    return containerReports;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Mon Jun 16 18:13:57 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
   private final int userMaxAppsDefault;
   private final int queueMaxAppsDefault;
 
+  // Maximum resource share for each leaf queue that can be used to run AMs
+  final Map<String, Float> queueMaxAMShares;
+  private final float queueMaxAMShareDefault;
+
   // ACL's for each queue. Only specifies non-default ACL's from configuration.
   private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
 
@@ -84,8 +88,9 @@ public class AllocationConfiguration {
   public AllocationConfiguration(Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources,
       Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-      Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-      int queueMaxAppsDefault,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+      int queueMaxAppsDefault, float queueMaxAMShareDefault,
       Map<String, SchedulingPolicy> schedulingPolicies,
       SchedulingPolicy defaultSchedulingPolicy,
       Map<String, Long> minSharePreemptionTimeouts,
@@ -97,9 +102,11 @@ public class AllocationConfiguration {
     this.maxQueueResources = maxQueueResources;
     this.queueMaxApps = queueMaxApps;
     this.userMaxApps = userMaxApps;
+    this.queueMaxAMShares = queueMaxAMShares;
     this.queueWeights = queueWeights;
     this.userMaxAppsDefault = userMaxAppsDefault;
     this.queueMaxAppsDefault = queueMaxAppsDefault;
+    this.queueMaxAMShareDefault = queueMaxAMShareDefault;
     this.defaultSchedulingPolicy = defaultSchedulingPolicy;
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -116,8 +123,10 @@ public class AllocationConfiguration {
     queueWeights = new HashMap<String, ResourceWeights>();
     queueMaxApps = new HashMap<String, Integer>();
     userMaxApps = new HashMap<String, Integer>();
+    queueMaxAMShares = new HashMap<String, Float>();
     userMaxAppsDefault = Integer.MAX_VALUE;
     queueMaxAppsDefault = Integer.MAX_VALUE;
+    queueMaxAMShareDefault = 1.0f;
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@@ -184,6 +193,11 @@ public class AllocationConfiguration {
     return (maxApps == null) ? queueMaxAppsDefault : maxApps;
   }
   
+  public float getQueueMaxAMShare(String queue) {
+    Float maxAMShare = queueMaxAMShares.get(queue);
+    return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+  }
+
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Mon Jun 16 18:13:57 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-  
+
+  public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   private final Clock clock;
 
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
   }
   
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     this.allocFile = getAllocationFile(conf);
-    super.init(conf);
-  }
-  
-  @Override
-  public void start() {
-    if (allocFile == null) {
-      return;
-    }
-    reloadThread = new Thread() {
-      public void run() {
-        while (running) {
-          long time = clock.getTime();
-          long lastModified = allocFile.lastModified();
-          if (lastModified > lastSuccessfulReload &&
-              time > lastModified + ALLOC_RELOAD_WAIT_MS) {
-            try {
-              reloadAllocations();
-            } catch (Exception ex) {
+    if (allocFile != null) {
+      reloadThread = new Thread() {
+        @Override
+        public void run() {
+          while (running) {
+            long time = clock.getTime();
+            long lastModified = allocFile.lastModified();
+            if (lastModified > lastSuccessfulReload &&
+                time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+              try {
+                reloadAllocations();
+              } catch (Exception ex) {
+                if (!lastReloadAttemptFailed) {
+                  LOG.error("Failed to reload fair scheduler config file - " +
+                      "will use existing allocations.", ex);
+                }
+                lastReloadAttemptFailed = true;
+              }
+            } else if (lastModified == 0l) {
               if (!lastReloadAttemptFailed) {
-                LOG.error("Failed to reload fair scheduler config file - " +
-                    "will use existing allocations.", ex);
+                LOG.warn("Failed to reload fair scheduler config file because" +
+                    " last modified returned 0. File exists: "
+                    + allocFile.exists());
               }
               lastReloadAttemptFailed = true;
             }
-          } else if (lastModified == 0l) {
-            if (!lastReloadAttemptFailed) {
-              LOG.warn("Failed to reload fair scheduler config file because" +
-                  " last modified returned 0. File exists: " + allocFile.exists());
+            try {
+              Thread.sleep(reloadIntervalMs);
+            } catch (InterruptedException ex) {
+              LOG.info(
+                  "Interrupted while waiting to reload alloc configuration");
             }
-            lastReloadAttemptFailed = true;
-          }
-          try {
-            Thread.sleep(reloadIntervalMs);
-          } catch (InterruptedException ex) {
-            LOG.info("Interrupted while waiting to reload alloc configuration");
           }
         }
-      }
-    };
-    reloadThread.setName("AllocationFileReloader");
-    reloadThread.setDaemon(true);
-    reloadThread.start();
-    super.start();
+      };
+      reloadThread.setName("AllocationFileReloader");
+      reloadThread.setDaemon(true);
+    }
+    super.serviceInit(conf);
   }
   
   @Override
-  public void stop() {
+  public void serviceStart() throws Exception {
+    if (reloadThread != null) {
+      reloadThread.start();
+    }
+    super.serviceStart();
+  }
+  
+  @Override
+  public void serviceStop() throws Exception {
     running = false;
-    reloadThread.interrupt();
-    super.stop();
+    if (reloadThread != null) {
+      reloadThread.interrupt();
+      try {
+        reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.warn("reloadThread fails to join.");
+      }
+    }
+    super.serviceStop();
   }
   
   /**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+    Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
     Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
+    float queueMaxAMShareDefault = 1.0f;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -273,6 +288,11 @@ public class AllocationFileLoaderService
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           queueMaxAppsDefault = val;
+        } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.min(val, 1.0f);
+          queueMaxAMShareDefault = val;
         } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
             || "defaultQueueSchedulingMode".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
@@ -297,8 +317,8 @@ public class AllocationFileLoaderService
         parent = null;
       }
       loadQueue(parent, element, minQueueResources, maxQueueResources,
-          queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-          minSharePreemptionTimeouts, queueAcls,
+          queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+          queuePolicies, minSharePreemptionTimeouts, queueAcls,
           configuredQueues);
     }
     
@@ -313,8 +333,8 @@ public class AllocationFileLoaderService
     }
     
     AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
-        queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-        queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+        queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+        queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
         queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
         newPlacementPolicy, configuredQueues);
     
@@ -329,7 +349,8 @@ public class AllocationFileLoaderService
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+      Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
@@ -361,6 +382,11 @@ public class AllocationFileLoaderService
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         queueMaxApps.put(queueName, val);
+      } else if ("maxAMShare".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.min(val, 1.0f);
+        queueMaxAMShares.put(queueName, val);
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
@@ -383,8 +409,9 @@ public class AllocationFileLoaderService
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts, queueAcls, configuredQueues);
+            queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+            queuePolicies, minSharePreemptionTimeouts, queueAcls,
+            configuredQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun 16 18:13:57 2014
@@ -264,8 +264,14 @@ public class AppSchedulable extends Sche
       }
 
       // Inform the node
-      node.allocateContainer(app.getApplicationId(),
-          allocatedContainer);
+      node.allocateContainer(allocatedContainer);
+
+      // If this container is used to run AM, update the leaf queue's AM usage
+      if (app.getLiveContainers().size() == 1 &&
+          !app.getUnmanagedAM()) {
+        queue.addAMResourceUsage(container.getResource());
+        app.setAmRunning(true);
+      }
 
       return container.getResource();
     } else {
@@ -297,6 +303,14 @@ public class AppSchedulable extends Sche
         
         app.addSchedulingOpportunity(priority);
 
+        // Check the AM resource usage for the leaf queue
+        if (app.getLiveContainers().size() == 0
+            && !app.getUnmanagedAM()) {
+          if (!queue.canRunAppAM(app.getAMResource())) {
+            return Resources.none();
+          }
+        }
+
         ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
             node.getRackName());
         ResourceRequest localRequest = app.getResourceRequest(priority,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Mon Jun 16 18:13:57 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -55,6 +56,9 @@ public class FSLeafQueue extends FSQueue
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
+  // Track the AM resource usage for this queue
+  private Resource amResourceUsage;
+
   private final ActiveUsersManager activeUsersManager;
   
   public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,6 +67,7 @@ public class FSLeafQueue extends FSQueue
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
+    amResourceUsage = Resource.newInstance(0, 0);
   }
   
   public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -86,6 +91,10 @@ public class FSLeafQueue extends FSQueue
    */
   public boolean removeApp(FSSchedulerApp app) {
     if (runnableAppScheds.remove(app.getAppSchedulable())) {
+      // Update AM resource usage
+      if (app.isAmRunning() && app.getAMResource() != null) {
+        Resources.subtractFrom(amResourceUsage, app.getAMResource());
+      }
       return true;
     } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
       return false;
@@ -145,6 +154,10 @@ public class FSLeafQueue extends FSQueue
     return usage;
   }
 
+  public Resource getAmResourceUsage() {
+    return amResourceUsage;
+  }
+
   @Override
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
@@ -284,4 +297,32 @@ public class FSLeafQueue extends FSQueue
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
   }
+
+  /**
+   * Check whether this queue can run this application master under the
+   * maxAMShare limit
+   *
+   * @param amResource
+   * @return true if this queue can run
+   */
+  public boolean canRunAppAM(Resource amResource) {
+    float maxAMShare =
+        scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+    Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+    return !policy
+        .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+  }
+
+  public void addAMResourceUsage(Resource amResource) {
+    if (amResource != null) {
+      Resources.addTo(amResourceUsage, amResource);
+    }
+  }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+  }
 }