You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC

svn commit: r1513258 [4/10] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-...

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java Mon Aug 12 21:25:49 2013
@@ -46,6 +46,7 @@ public class ApplicationReportPBImpl ext
   private ApplicationId applicationId;
   private ApplicationAttemptId currentApplicationAttemptId;
   private Token clientToAMToken = null;
+  private Token amRmToken = null;
 
   public ApplicationReportPBImpl() {
     builder = ApplicationReportProto.newBuilder();
@@ -228,7 +229,20 @@ public class ApplicationReportPBImpl ext
     }
     return p.getApplicationType();
   }
-  
+
+  @Override
+  public Token getAMRMToken() {
+    ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (amRmToken != null) {
+      return amRmToken;
+    }
+    if (!p.hasAmRmToken()) {
+      return null;
+    }
+    amRmToken = convertFromProtoFormat(p.getAmRmToken());
+    return amRmToken;
+  }
+
   @Override
   public void setApplicationId(ApplicationId applicationId) {
     maybeInitBuilder();
@@ -377,6 +391,15 @@ public class ApplicationReportPBImpl ext
     builder.setProgress(progress);
   }
 
+  @Override
+  public void setAMRMToken(Token amRmToken) {
+    maybeInitBuilder();
+    if (amRmToken == null) {
+      builder.clearAmRmToken();
+    }
+    this.amRmToken = amRmToken;
+  }
+
   public ApplicationReportProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -420,6 +443,11 @@ public class ApplicationReportPBImpl ext
             builder.getClientToAmToken())) {
       builder.setClientToAmToken(convertToProtoFormat(this.clientToAMToken));
     }
+    if (this.amRmToken != null
+      && !((TokenPBImpl) this.amRmToken).getProto().equals(
+      builder.getAmRmToken())) {
+      builder.setAmRmToken(convertToProtoFormat(this.amRmToken));
+    }
   }
 
   private void mergeLocalToProto() {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java Mon Aug 12 21:25:49 2013
@@ -166,9 +166,11 @@ public class RpcServerFactoryPBImpl impl
       SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
       BlockingService blockingService, String portRangeConfig) throws IOException {
     RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
-    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
-        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
-        secretManager, portRangeConfig);
+    RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
+        .setInstance(blockingService).setBindAddress(addr.getHostName())
+        .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
+        .setSecretManager(secretManager).setPortRangeConfig(portRangeConfig)
+        .build();
     LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
     server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
     return server;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java Mon Aug 12 21:25:49 2013
@@ -41,6 +41,7 @@ public class AggregatedLogDeletionServic
   private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
   
   private Timer timer = null;
+  private long checkIntervalMsecs;
   
   static class LogDeletionTask extends TimerTask {
     private Configuration conf;
@@ -133,37 +134,71 @@ public class AggregatedLogDeletionServic
 
   @Override
   protected void serviceStart() throws Exception {
+    scheduleLogDeletionTask();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopTimer();
+    super.serviceStop();
+  }
+  
+  private void setLogAggCheckIntervalMsecs(long retentionSecs) {
+    Configuration conf = getConfig();
+    checkIntervalMsecs = 1000 * conf
+        .getLong(
+            YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+    if (checkIntervalMsecs <= 0) {
+      // when unspecified compute check interval as 1/10th of retention
+      checkIntervalMsecs = (retentionSecs * 1000) / 10;
+    }
+  }
+  
+  public void refreshLogRetentionSettings() {
+    if (getServiceState() == STATE.STARTED) {
+      Configuration conf = createConf();
+      setConfig(conf);
+      stopTimer();
+      scheduleLogDeletionTask();
+    } else {
+      LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started");
+    }
+  }
+  
+  private void scheduleLogDeletionTask() {
     Configuration conf = getConfig();
     if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
-      //Log aggregation is not enabled so don't bother
+      // Log aggregation is not enabled so don't bother
       return;
     }
-    long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
+    long retentionSecs = conf.getLong(
+        YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
-    if(retentionSecs < 0) {
-      LOG.info("Log Aggregation deletion is disabled because retention is" +
-      		" too small (" + retentionSecs + ")");
+    if (retentionSecs < 0) {
+      LOG.info("Log Aggregation deletion is disabled because retention is"
+          + " too small (" + retentionSecs + ")");
       return;
     }
-    long checkIntervalMsecs = 1000 * conf.getLong(
-        YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
-    if (checkIntervalMsecs <= 0) {
-      // when unspecified compute check interval as 1/10th of retention
-      checkIntervalMsecs = (retentionSecs * 1000) / 10;
-    }
+    setLogAggCheckIntervalMsecs(retentionSecs);
     TimerTask task = new LogDeletionTask(conf, retentionSecs);
     timer = new Timer();
     timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
-    super.serviceStart();
   }
 
-  @Override
-  protected void serviceStop() throws Exception {
-    if(timer != null) {
+  private void stopTimer() {
+    if (timer != null) {
       timer.cancel();
     }
-    super.serviceStop();
+  }
+  
+  public long getCheckIntervalMsecs() {
+    return checkIntervalMsecs;
+  }
+
+  protected Configuration createConf() {
+    return new Configuration();
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java Mon Aug 12 21:25:49 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.proto.Reso
 @Private
 @Unstable
 @ProtocolInfo(
-    protocolName = "org.apache.hadoop.yarn.api.ResourceManagerAdministrationProtocolPB",
+    protocolName = "org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB",
     protocolVersion = 1)
 public interface ResourceManagerAdministrationProtocolPB extends ResourceManagerAdministrationProtocolService.BlockingInterface {
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java Mon Aug 12 21:25:49 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -31,12 +33,13 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -104,12 +107,15 @@ public class TestContainerLaunchRPC {
           TestRPC.newContainerToken(nodeId, "password".getBytes(),
             containerTokenIdentifier);
 
-      StartContainerRequest scRequest = recordFactory
-          .newRecordInstance(StartContainerRequest.class);
-      scRequest.setContainerLaunchContext(containerLaunchContext);
-      scRequest.setContainerToken(containerToken);
+      StartContainerRequest scRequest =
+          StartContainerRequest.newInstance(containerLaunchContext,
+            containerToken);
+      List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+      list.add(scRequest);
+      StartContainersRequest allRequests =
+          StartContainersRequest.newInstance(list);
       try {
-        proxy.startContainer(scRequest);
+        proxy.startContainers(allRequests);
       } catch (Exception e) {
         LOG.info(StringUtils.stringifyException(e));
         Assert.assertEquals("Error, exception is not: "
@@ -129,17 +135,8 @@ public class TestContainerLaunchRPC {
     private ContainerStatus status = null;
 
     @Override
-    public GetContainerStatusResponse getContainerStatus(
-        GetContainerStatusRequest request) throws YarnException {
-      GetContainerStatusResponse response = recordFactory
-          .newRecordInstance(GetContainerStatusResponse.class);
-      response.setStatus(status);
-      return response;
-    }
-
-    @Override
-    public StartContainerResponse startContainer(StartContainerRequest request)
-        throws YarnException, IOException {
+    public StartContainersResponse startContainers(
+        StartContainersRequest requests) throws YarnException, IOException {
       try {
         // make the thread sleep to look like its not going to respond
         Thread.sleep(10000);
@@ -151,11 +148,22 @@ public class TestContainerLaunchRPC {
     }
 
     @Override
-    public StopContainerResponse stopContainer(StopContainerRequest request)
-        throws YarnException {
+    public StopContainersResponse
+        stopContainers(StopContainersRequest requests) throws YarnException,
+            IOException {
       Exception e = new Exception("Dummy function", new Exception(
           "Dummy function cause"));
       throw new YarnException(e);
     }
+
+    @Override
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request) throws YarnException, IOException {
+      List<ContainerStatus> list = new ArrayList<ContainerStatus>();
+      list.add(status);
+      GetContainerStatusesResponse response =
+          GetContainerStatusesResponse.newInstance(list, null);
+      return null;
+    }
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -33,13 +35,14 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -122,9 +125,6 @@ public class TestRPC {
         ApplicationAttemptId.newInstance(applicationId, 0);
     ContainerId containerId =
         ContainerId.newInstance(applicationAttemptId, 100);
-    StartContainerRequest scRequest =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    scRequest.setContainerLaunchContext(containerLaunchContext);
     NodeId nodeId = NodeId.newInstance("localhost", 1234);
     Resource resource = Resource.newInstance(1234, 2);
     ContainerTokenIdentifier containerTokenIdentifier =
@@ -132,22 +132,32 @@ public class TestRPC {
           resource, System.currentTimeMillis() + 10000, 42, 42);
     Token containerToken = newContainerToken(nodeId, "password".getBytes(),
           containerTokenIdentifier);
-    scRequest.setContainerToken(containerToken);
-    proxy.startContainer(scRequest);
-    
-    GetContainerStatusRequest gcsRequest = 
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-    gcsRequest.setContainerId(containerId);
-    GetContainerStatusResponse response =  proxy.getContainerStatus(gcsRequest);
-    ContainerStatus status = response.getStatus();
-    
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          containerToken);
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    proxy.startContainers(allRequests);
+
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
+    GetContainerStatusesResponse response =
+        proxy.getContainerStatuses(gcsRequest);
+    List<ContainerStatus> statuses = response.getContainerStatuses();
+
     //test remote exception
     boolean exception = false;
     try {
-      StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
-      stopRequest.setContainerId(containerId);
-      proxy.stopContainer(stopRequest);
-    } catch (YarnException e) {
+      StopContainersRequest stopRequest =
+          recordFactory.newRecordInstance(StopContainersRequest.class);
+      stopRequest.setContainerIds(containerIds);
+      proxy.stopContainers(stopRequest);
+      } catch (YarnException e) {
       exception = true;
       Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
       Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
@@ -158,46 +168,51 @@ public class TestRPC {
     Assert.assertTrue(exception);
     
     server.stop();
-    Assert.assertNotNull(status);
-    Assert.assertEquals(ContainerState.RUNNING, status.getState());
+    Assert.assertNotNull(statuses.get(0));
+    Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
   }
 
   public class DummyContainerManager implements ContainerManagementProtocol {
 
-    private ContainerStatus status = null;    
-    
+    private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+
     @Override
-    public GetContainerStatusResponse getContainerStatus(
-        GetContainerStatusRequest request)
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request)
     throws YarnException {
-      GetContainerStatusResponse response = 
-          recordFactory.newRecordInstance(GetContainerStatusResponse.class);
-      response.setStatus(status);
+      GetContainerStatusesResponse response = 
+          recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
+      response.setContainerStatuses(statuses);
       return response;
     }
 
     @Override
-    public StartContainerResponse startContainer(StartContainerRequest request) 
-        throws YarnException {
-      Token containerToken = request.getContainerToken();
-      ContainerTokenIdentifier tokenId = null;
-
-      try {
-        tokenId = newContainerTokenIdentifier(containerToken);
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
+    public StartContainersResponse startContainers(
+        StartContainersRequest requests) throws YarnException {
+      StartContainersResponse response =
+          recordFactory.newRecordInstance(StartContainersResponse.class);
+      for (StartContainerRequest request : requests.getStartContainerRequests()) {
+        Token containerToken = request.getContainerToken();
+        ContainerTokenIdentifier tokenId = null;
+
+        try {
+          tokenId = newContainerTokenIdentifier(containerToken);
+        } catch (IOException e) {
+          throw RPCUtil.getRemoteException(e);
+        }
+        ContainerStatus status =
+            recordFactory.newRecordInstance(ContainerStatus.class);
+        status.setState(ContainerState.RUNNING);
+        status.setContainerId(tokenId.getContainerID());
+        status.setExitStatus(0);
+        statuses.add(status);
+
       }
-      StartContainerResponse response = 
-          recordFactory.newRecordInstance(StartContainerResponse.class);
-      status = recordFactory.newRecordInstance(ContainerStatus.class);
-      status.setState(ContainerState.RUNNING);
-      status.setContainerId(tokenId.getContainerID());
-      status.setExitStatus(0);
       return response;
     }
 
     @Override
-    public StopContainerResponse stopContainer(StopContainerRequest request) 
+    public StopContainersResponse stopContainers(StopContainersRequest request) 
     throws YarnException {
       Exception e = new Exception(EXCEPTION_MSG, 
           new Exception(EXCEPTION_CAUSE));

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java Mon Aug 12 21:25:49 2013
@@ -46,6 +46,7 @@ public class TestApplicatonReport {
     appReport2.setCurrentApplicationAttemptId(null);
     Assert.assertNull(appReport2.getCurrentApplicationAttemptId());
     Assert.assertNotSame(appReport2, appReport3);
+    Assert.assertNull(appReport1.getAMRMToken());
   }
 
   protected static ApplicationReport createApplicationReport(
@@ -57,7 +58,7 @@ public class TestApplicatonReport {
         ApplicationReport.newInstance(appId, appAttemptId, "user", "queue",
           "appname", "host", 124, null, YarnApplicationState.FINISHED,
           "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
-          "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+          "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
     return appReport;
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Mon Aug 12 21:25:49 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Assert;
 
 import static org.mockito.Mockito.*;
 
@@ -129,6 +130,99 @@ public class TestAggregatedLogDeletionSe
   }
 
   @Test
+  public void testRefreshLogRetentionSettings() throws IOException {
+    long now = System.currentTimeMillis();
+    //time before 2000 sec
+    long before2000Secs = now - (2000 * 1000);
+    //time before 50 sec
+    long before50Secs = now - (50 * 1000);
+    String root = "mockfs://foo/";
+    String remoteRootLogDir = root + "tmp/logs";
+    String suffix = "logs";
+    final Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        "1");
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+    Path rootPath = new Path(root);
+    FileSystem rootFs = rootPath.getFileSystem(conf);
+    FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
+
+    Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+    Path userDir = new Path(remoteRootLogPath, "me");
+    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+        userDir);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+        new FileStatus[] { userDirStatus });
+
+    Path userLogDir = new Path(userDir, suffix);
+
+    //Set time last modified of app1Dir directory and its files to before2000Secs 
+    Path app1Dir = new Path(userLogDir, "application_1_1");
+    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
+        app1Dir);
+    
+    //Set time last modified of app1Dir directory and its files to before50Secs 
+    Path app2Dir = new Path(userLogDir, "application_1_2");
+    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+        app2Dir);
+
+    when(mockFs.listStatus(userLogDir)).thenReturn(
+        new FileStatus[] { app1DirStatus, app2DirStatus });
+
+    Path app1Log1 = new Path(app1Dir, "host1");
+    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
+        app1Log1);
+
+    when(mockFs.listStatus(app1Dir)).thenReturn(
+        new FileStatus[] { app1Log1Status });
+
+    Path app2Log1 = new Path(app2Dir, "host1");
+    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
+        app2Log1);
+
+    when(mockFs.listStatus(app2Dir)).thenReturn(
+        new FileStatus[] { app2Log1Status });
+
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
+      @Override
+      protected Configuration createConf() {
+        return conf;
+      }
+    };
+    
+    deletionSvc.init(conf);
+    deletionSvc.start();
+    
+    //app1Dir would be deleted since its done above log retention period
+    verify(mockFs, timeout(10000)).delete(app1Dir, true);
+    //app2Dir is not expected to be deleted since its below the threshold
+    verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
+
+    //Now,lets change the confs
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        "2");
+    //We have not called refreshLogSettings,hence don't expect to see the changed conf values
+    Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+    
+    //refresh the log settings
+    deletionSvc.refreshLogRetentionSettings();
+
+    //Check interval time should reflect the new value
+    Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+    //app2Dir should be deleted since it falls above the threshold
+    verify(mockFs, timeout(10000)).delete(app2Dir, true);
+    deletionSvc.stop();
+  }
+  
+  @Test
   public void testCheckInterval() throws Exception {
     long RETENTION_SECS = 10 * 24 * 3600;
     long now = System.currentTimeMillis();
@@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionSe
         new AggregatedLogDeletionService();
     deletionSvc.init(conf);
     deletionSvc.start();
-
+ 
     verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
     verify(mockFs, never()).delete(app1Dir, true);
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-common</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-yarn-server-common</name>
 
   <properties>

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java Mon Aug 12 21:25:49 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.api.impl.pb.client;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.server.api
 
 import com.google.protobuf.ServiceException;
 
-public class ResourceTrackerPBClientImpl implements ResourceTracker {
+public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
 
 private ResourceTrackerPB proxy;
   
@@ -50,7 +51,14 @@ private ResourceTrackerPB proxy;
     proxy = (ResourceTrackerPB)RPC.getProxy(
         ResourceTrackerPB.class, clientVersion, addr, conf);
   }
-  
+
+  @Override
+  public void close() {
+    if(this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnException,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Mon Aug 12 21:25:49 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -232,6 +231,11 @@ public class BuilderUtils {
     return newToken(Token.class, identifier, kind, password, service);
   }
 
+  public static Token newAMRMToken(byte[] identifier, String kind,
+                                   byte[] password, String service) {
+    return newToken(Token.class, identifier, kind, password, service);
+  }
+
   @Private
   @VisibleForTesting
   public static Token newContainerToken(NodeId nodeId,
@@ -308,7 +312,7 @@ public class BuilderUtils {
       String url, long startTime, long finishTime,
       FinalApplicationStatus finalStatus,
       ApplicationResourceUsageReport appResources, String origTrackingUrl,
-      float progress, String appType) {
+      float progress, String appType, Token amRmToken) {
     ApplicationReport report = recordFactory
         .newRecordInstance(ApplicationReport.class);
     report.setApplicationId(applicationId);
@@ -329,6 +333,7 @@ public class BuilderUtils {
     report.setOriginalTrackingUrl(origTrackingUrl);
     report.setProgress(progress);
     report.setApplicationType(appType);
+    report.setAMRMToken(amRmToken);
     return report;
   }
   
@@ -390,20 +395,6 @@ public class BuilderUtils {
     url.setFile(file);
     return url;
   }
-
-  public static AllocateRequest newAllocateRequest(
-      ApplicationAttemptId applicationAttemptId, int responseID,
-      float appProgress, List<ResourceRequest> resourceAsk,
-      List<ContainerId> containersToBeReleased) {
-    AllocateRequest allocateRequest = recordFactory
-        .newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(applicationAttemptId);
-    allocateRequest.setResponseId(responseID);
-    allocateRequest.setProgress(appProgress);
-    allocateRequest.setAskList(resourceAsk);
-    allocateRequest.setReleaseList(containersToBeReleased);
-    return allocateRequest;
-  }
   
   public static AllocateResponse newAllocateResponse(int responseId,
       List<ContainerStatus> completedContainers,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java Mon Aug 12 21:25:49 2013
@@ -22,12 +22,12 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -59,10 +59,4 @@ public class YarnServerBuilderUtils {
     }
     return response;
   }
-  
-  public static SerializedException newSerializedException(Throwable e) {
-    SerializedException se = Records.newRecord(SerializedException.class);
-    se.init(e);
-    return se;
-  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java Mon Aug 12 21:25:49 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
@@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.SerializedExceptionPBImpl;
 import org.junit.Test;
 
 /**

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-nodemanager</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-yarn-server-nodemanager</name>
 
   <properties>
@@ -143,6 +143,10 @@
               <name>application.submitter</name>
               <value>${application.submitter}</value>
             </property>
+	    <property>
+	      <name>yarn.log.dir</name>
+	      <value>${maven.project.build.directory}/logs</value>
+	    </property>
           </systemPropertyVariables>
           <excludes>
             <exclude>**/TestFSDownload.java</exclude>

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -39,8 +39,10 @@ import org.apache.hadoop.util.Shell.Shel
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 public abstract class ContainerExecutor implements Configurable {
 
@@ -298,15 +300,16 @@ public abstract class ContainerExecutor 
   }
 
   public static class DelayedProcessKiller extends Thread {
+    private Container container;
     private final String user;
     private final String pid;
     private final long delay;
     private final Signal signal;
     private final ContainerExecutor containerExecutor;
 
-    public DelayedProcessKiller(String user, String pid, long delay,
-        Signal signal,
-        ContainerExecutor containerExecutor) {
+    public DelayedProcessKiller(Container container, String user, String pid,
+        long delay, Signal signal, ContainerExecutor containerExecutor) {
+      this.container = container;
       this.user = user;
       this.pid = pid;
       this.delay = delay;
@@ -323,7 +326,11 @@ public abstract class ContainerExecutor 
       } catch (InterruptedException e) {
         return;
       } catch (IOException e) {
-        LOG.warn("Exception when killing task " + pid, e);
+        String message = "Exception when user " + user + " killing task " + pid
+            + " in DelayedProcessKiller: " + StringUtils.stringifyException(e);
+        LOG.warn(message);
+        container.handle(new ContainerDiagnosticsUpdateEvent(container
+          .getContainerId(), message));
       }
     }
   }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -203,11 +204,23 @@ public class DefaultContainerExecutor ex
         return -1;
       }
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from task is : " + exitCode);
-      String message = shExec.getOutput();
-      logOutput(message);
-      container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
-          message));
+      LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
+      // terminated/killed forcefully. In all other cases, log the
+      // container-executor's output
+      if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
+          && exitCode != ExitCode.TERMINATED.getExitCode()) {
+        LOG.warn("Exception from container-launch with container ID: "
+            + containerId + " and exit code: " + exitCode , e);
+        logOutput(shExec.getOutput());
+        String diagnostics = "Exception from container-launch: \n"
+            + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+            diagnostics));
+      } else {
+        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+            "Container killed on request. Exit code is " + exitCode));
+      }
       return exitCode;
     } finally {
       ; //

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Mon Aug 12 21:25:49 2013
@@ -18,23 +18,30 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class DeletionService extends AbstractService {
@@ -42,7 +49,8 @@ public class DeletionService extends Abs
   private int debugDelay;
   private final ContainerExecutor exec;
   private ScheduledThreadPoolExecutor sched;
-  private final FileContext lfs = getLfs();
+  private static final FileContext lfs = getLfs();
+
   static final FileContext getLfs() {
     try {
       return FileContext.getLocalFSFileContext();
@@ -68,11 +76,23 @@ public class DeletionService extends Abs
   public void delete(String user, Path subDir, Path... baseDirs) {
     // TODO if parent owned by NM, rename within parent inline
     if (debugDelay != -1) {
-      sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay,
-          TimeUnit.SECONDS);
+      if (baseDirs == null || baseDirs.length == 0) {
+        sched.schedule(new FileDeletionTask(this, user, subDir, null),
+          debugDelay, TimeUnit.SECONDS);
+      } else {
+        sched.schedule(
+          new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
+          debugDelay, TimeUnit.SECONDS);
+      }
     }
   }
-
+  
+  public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
+    if (debugDelay != -1) {
+      sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
+    }
+  }
+  
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     ThreadFactory tf = new ThreadFactoryBuilder()
@@ -118,46 +138,184 @@ public class DeletionService extends Abs
     return getServiceState() == STATE.STOPPED && sched.isTerminated();
   }
 
-  private class FileDeletion implements Runnable {
-    final String user;
-    final Path subDir;
-    final Path[] baseDirs;
-    FileDeletion(String user, Path subDir, Path[] baseDirs) {
+  public static class FileDeletionTask implements Runnable {
+    private final String user;
+    private final Path subDir;
+    private final List<Path> baseDirs;
+    private final AtomicInteger numberOfPendingPredecessorTasks;
+    private final Set<FileDeletionTask> successorTaskSet;
+    private final DeletionService delService;
+    // By default all tasks will start as success=true; however if any of
+    // the dependent task fails then it will be marked as false in
+    // fileDeletionTaskFinished().
+    private boolean success;
+    
+    private FileDeletionTask(DeletionService delService, String user,
+        Path subDir, List<Path> baseDirs) {
+      this.delService = delService;
       this.user = user;
       this.subDir = subDir;
       this.baseDirs = baseDirs;
+      this.successorTaskSet = new HashSet<FileDeletionTask>();
+      this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
+      success = true;
+    }
+    
+    /**
+     * increments and returns pending predecessor task count
+     */
+    public int incrementAndGetPendingPredecessorTasks() {
+      return numberOfPendingPredecessorTasks.incrementAndGet();
+    }
+    
+    /**
+     * decrements and returns pending predecessor task count
+     */
+    public int decrementAndGetPendingPredecessorTasks() {
+      return numberOfPendingPredecessorTasks.decrementAndGet();
     }
+    
+    @VisibleForTesting
+    public String getUser() {
+      return this.user;
+    }
+    
+    @VisibleForTesting
+    public Path getSubDir() {
+      return this.subDir;
+    }
+    
+    @VisibleForTesting
+    public List<Path> getBaseDirs() {
+      return this.baseDirs;
+    }
+    
+    public synchronized void setSuccess(boolean success) {
+      this.success = success;
+    }
+    
+    public synchronized boolean getSucess() {
+      return this.success;
+    }
+    
     @Override
     public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this);
+      }
+      boolean error = false;
       if (null == user) {
-        if (baseDirs == null || baseDirs.length == 0) {
+        if (baseDirs == null || baseDirs.size() == 0) {
           LOG.debug("NM deleting absolute path : " + subDir);
           try {
             lfs.delete(subDir, true);
           } catch (IOException e) {
+            error = true;
             LOG.warn("Failed to delete " + subDir);
           }
-          return;
-        }
-        for (Path baseDir : baseDirs) {
-          Path del = subDir == null? baseDir : new Path(baseDir, subDir);
-          LOG.debug("NM deleting path : " + del);
-          try {
-            lfs.delete(del, true);
-          } catch (IOException e) {
-            LOG.warn("Failed to delete " + subDir);
+        } else {
+          for (Path baseDir : baseDirs) {
+            Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+            LOG.debug("NM deleting path : " + del);
+            try {
+              lfs.delete(del, true);
+            } catch (IOException e) {
+              error = true;
+              LOG.warn("Failed to delete " + subDir);
+            }
           }
         }
       } else {
         try {
           LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
-          exec.deleteAsUser(user, subDir, baseDirs);
+          if (baseDirs == null || baseDirs.size() == 0) {
+            delService.exec.deleteAsUser(user, subDir, (Path[])null);
+          } else {
+            delService.exec.deleteAsUser(user, subDir,
+              baseDirs.toArray(new Path[0]));
+          }
         } catch (IOException e) {
+          error = true;
           LOG.warn("Failed to delete as user " + user, e);
         } catch (InterruptedException e) {
+          error = true;
           LOG.warn("Failed to delete as user " + user, e);
         }
       }
+      if (error) {
+        setSuccess(!error);        
+      }
+      fileDeletionTaskFinished();
     }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
+      sb.append("  user : ").append(this.user);
+      sb.append("  subDir : ").append(
+        subDir == null ? "null" : subDir.toString());
+      sb.append("  baseDir : ");
+      if (baseDirs == null || baseDirs.size() == 0) {
+        sb.append("null");
+      } else {
+        for (Path baseDir : baseDirs) {
+          sb.append(baseDir.toString()).append(',');
+        }
+      }
+      return sb.toString();
+    }
+    
+    /**
+     * If there is a task dependency between say tasks 1,2,3 such that
+     * task2 and task3 can be started only after task1 then we should define
+     * task2 and task3 as successor tasks for task1.
+     * Note:- Task dependency should be defined prior to
+     * @param successorTask
+     */
+    public synchronized void addFileDeletionTaskDependency(
+        FileDeletionTask successorTask) {
+      if (successorTaskSet.add(successorTask)) {
+        successorTask.incrementAndGetPendingPredecessorTasks();
+      }
+    }
+    
+    /*
+     * This is called when
+     * 1) Current file deletion task ran and finished.
+     * 2) This can be even directly called by predecessor task if one of the
+     * dependent tasks of it has failed marking its success = false.  
+     */
+    private synchronized void fileDeletionTaskFinished() {
+      Iterator<FileDeletionTask> successorTaskI =
+          this.successorTaskSet.iterator();
+      while (successorTaskI.hasNext()) {
+        FileDeletionTask successorTask = successorTaskI.next();
+        if (!success) {
+          successorTask.setSuccess(success);
+        }
+        int count = successorTask.decrementAndGetPendingPredecessorTasks();
+        if (count == 0) {
+          if (successorTask.getSucess()) {
+            successorTask.delService.scheduleFileDeletionTask(successorTask);
+          } else {
+            successorTask.fileDeletionTaskFinished();
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Helper method to create file deletion task. To be used only if we need
+   * a way to define dependencies between deletion tasks.
+   * @param user user on whose behalf this task is suppose to run
+   * @param subDir sub directory as required in 
+   * {@link DeletionService#delete(String, Path, Path...)}
+   * @param baseDirs base directories as required in
+   * {@link DeletionService#delete(String, Path, Path...)}
+   */
+  public FileDeletionTask createFileDeletionTask(String user, Path subDir,
+      Path[] baseDirs) {
+    return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -147,7 +147,8 @@ public class LinuxContainerExecutor exte
       shExec.execute();
     } catch (ExitCodeException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from container is : " + exitCode);
+      LOG.warn("Exit code from container executor initialization is : "
+          + exitCode, e);
       logOutput(shExec.getOutput());
       throw new IOException("Linux container executor not configured properly"
           + " (error=" + exitCode + ")", e);
@@ -204,10 +205,11 @@ public class LinuxContainerExecutor exte
       }
     } catch (ExitCodeException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from container is : " + exitCode);
+      LOG.warn("Exit code from container " + locId + " startLocalizer is : "
+          + exitCode, e);
       logOutput(shExec.getOutput());
-      throw new IOException("App initialization failed (" + exitCode + 
-          ") with output: " + shExec.getOutput(), e);
+      throw new IOException("Application " + appId + " initialization failed" +
+      		" (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
     }
   }
 
@@ -256,19 +258,18 @@ public class LinuxContainerExecutor exte
         return ExitCode.TERMINATED.getExitCode();
       }
     } catch (ExitCodeException e) {
-
       if (null == shExec) {
         return -1;
       }
-
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from container is : " + exitCode);
+      LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
       // terminated/killed forcefully. In all other cases, log the
       // container-executor's output
       if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
           && exitCode != ExitCode.TERMINATED.getExitCode()) {
-        LOG.warn("Exception from container-launch : ", e);
+        LOG.warn("Exception from container-launch with container ID: "
+            + containerId + " and exit code: " + exitCode , e);
         logOutput(shExec.getOutput());
         String diagnostics = "Exception from container-launch: \n"
             + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
@@ -310,9 +311,12 @@ public class LinuxContainerExecutor exte
       if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
         return false;
       }
+      LOG.warn("Error in signalling container " + pid + " with " + signal
+          + "; exit = " + ret_code, e);
       logOutput(shExec.getOutput());
-      throw new IOException("Problem signalling container " + pid + " with " +
-                            signal + "; exit = " + ret_code);
+      throw new IOException("Problem signalling container " + pid + " with "
+          + signal + "; output: " + shExec.getOutput() + " and exitCode: "
+          + ret_code, e);
     }
     return true;
   }
@@ -346,13 +350,10 @@ public class LinuxContainerExecutor exte
       }
     } catch (IOException e) {
       int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from container is : " + exitCode);
-      if (exitCode != 0) {
-        LOG.error("DeleteAsUser for " + dir.toUri().getPath()
-            + " returned with non-zero exit code" + exitCode);
-        LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
-        logOutput(shExec.getOutput());
-      }
+      LOG.error("DeleteAsUser for " + dir.toUri().getPath()
+          + " returned with exit code: " + exitCode, e);
+      LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
+      logOutput(shExec.getOutput());
     }
   }
   
@@ -373,9 +374,10 @@ public class LinuxContainerExecutor exte
         shExec.execute();
     } catch (IOException e) {
         int ret_code = shExec.getExitCode();
+        LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);
         logOutput(shExec.getOutput());
         throw new IOException("Problem mounting cgroups " + cgroupKVs + 
-                  "; exit code = " + ret_code, e);
+          "; exit code = " + ret_code + " and output: " + shExec.getOutput(), e);
     }
   }  
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
@@ -229,6 +230,15 @@ public class NodeManager extends Composi
     return "NodeManager";
   }
 
+  protected void shutDown() {
+    new Thread() {
+      @Override
+      public void run() {
+        NodeManager.this.stop();
+      }
+    }.start();
+  }
+
   protected void resyncWithRM() {
     //we do not want to block dispatcher thread here
     new Thread() {
@@ -265,6 +275,8 @@ public class NodeManager extends Composi
       while (!containers.isEmpty()
           && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
         try {
+          //To remove done containers in NM context
+          nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
           Thread.sleep(1000);
         } catch (InterruptedException ex) {
           LOG.warn("Interrupted while sleeping on container kill on shutdown",
@@ -276,7 +288,6 @@ public class NodeManager extends Composi
       while (!containers.isEmpty()) {
         try {
           Thread.sleep(1000);
-          //to remove done containers from the map
           nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
         } catch (InterruptedException ex) {
           LOG.warn("Interrupted while sleeping on container kill on resync",
@@ -409,7 +420,7 @@ public class NodeManager extends Composi
   public void handle(NodeManagerEvent event) {
     switch (event.getType()) {
     case SHUTDOWN:
-      stop();
+      shutDown();
       break;
     case RESYNC:
       resyncWithRM();
@@ -446,4 +457,10 @@ public class NodeManager extends Composi
     Configuration conf = new YarnConfiguration();
     nodeManager.initAndStartNodeManager(conf, false);
   }
+  
+  @VisibleForTesting
+  @Private
+  public NodeStatusUpdater getNodeStatusUpdater() {
+    return nodeStatusUpdater;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
 public interface NodeStatusUpdater extends Service {
@@ -28,4 +29,8 @@ public interface NodeStatusUpdater exten
   NodeStatus getNodeStatusAndUpdateContainersInContext();
 
   long getRMIdentifier();
+  
+  public boolean isContainerRecentlyStopped(ContainerId containerId);
+  
+  public void clearFinishedContainersFromCache();
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Aug 12 21:25:49 2013
@@ -19,11 +19,12 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,6 +34,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,9 +49,9 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -67,6 +69,9 @@ import com.google.common.annotations.Vis
 public class NodeStatusUpdaterImpl extends AbstractService implements
     NodeStatusUpdater {
 
+  public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS =
+      YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers";
+
   private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
 
   private final Object heartbeatMonitor = new Object();
@@ -77,7 +82,6 @@ public class NodeStatusUpdaterImpl exten
   private NodeId nodeId;
   private long nextHeartBeatInterval;
   private ResourceTracker resourceTracker;
-  private InetSocketAddress rmAddress;
   private Resource totalResource;
   private int httpPort;
   private volatile boolean isStopped;
@@ -88,12 +92,13 @@ public class NodeStatusUpdaterImpl exten
   private Map<ApplicationId, Long> appTokenKeepAliveMap =
       new HashMap<ApplicationId, Long>();
   private Random keepAliveDelayRandom = new Random();
+  // It will be used to track recently stopped containers on node manager.
+  private final Map<ContainerId, Long> recentlyStoppedContainers;
+  // Duration for which to track recently stopped container.
+  private long durationToTrackStoppedContainers;
 
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
-  private long rmConnectWaitMS;
-  private long rmConnectionRetryIntervalMS;
-  private boolean waitForEver;
 
   private Runnable statusUpdaterRunnable;
   private Thread  statusUpdater;
@@ -106,15 +111,12 @@ public class NodeStatusUpdaterImpl exten
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
+    this.recentlyStoppedContainers =
+        new LinkedHashMap<ContainerId, Long>();
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    this.rmAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-
     int memoryMb = 
         conf.getInt(
             YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
@@ -137,11 +139,27 @@ public class NodeStatusUpdaterImpl exten
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
     
+    // Default duration to track stopped containers on nodemanager is 10Min.
+    // This should not be assigned very large value as it will remember all the
+    // containers stopped during that time.
+    durationToTrackStoppedContainers =
+        conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+          600000);
+    if (durationToTrackStoppedContainers < 0) {
+      String message = "Invalid configuration for "
+        + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+          + "value is 10Min(600000).";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+        + durationToTrackStoppedContainers);
+    }
+    super.serviceInit(conf);
     LOG.info("Initialized nodemanager for " + nodeId + ":" +
         " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
         " virtual-cores=" + virtualCores);
-    
-    super.serviceInit(conf);
   }
 
   @Override
@@ -153,6 +171,7 @@ public class NodeStatusUpdaterImpl exten
     try {
       // Registration has to be in start so that ContainerManager can get the
       // perNM tokens needed to authenticate ContainerTokens.
+      this.resourceTracker = getRMClient();
       registerWithRM();
       super.serviceStart();
       startStatusUpdater();
@@ -167,6 +186,7 @@ public class NodeStatusUpdaterImpl exten
   protected void serviceStop() throws Exception {
     // Interrupt the updater.
     this.isStopped = true;
+    stopRMProxy();
     super.serviceStop();
   }
 
@@ -188,6 +208,13 @@ public class NodeStatusUpdaterImpl exten
     }
   }
 
+  @VisibleForTesting
+  protected void stopRMProxy() {
+    if(this.resourceTracker != null) {
+      RPC.stopProxy(this.resourceTracker);
+    }
+  }
+
   @Private
   protected boolean isTokenKeepAliveEnabled(Configuration conf) {
     return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -195,93 +222,22 @@ public class NodeStatusUpdaterImpl exten
         && UserGroupInformation.isSecurityEnabled();
   }
 
-  protected ResourceTracker getRMClient() {
+  @VisibleForTesting
+  protected ResourceTracker getRMClient() throws IOException {
     Configuration conf = getConfig();
-    YarnRPC rpc = YarnRPC.create(conf);
-    return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
-        conf);
+    return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
   }
 
   @VisibleForTesting
   protected void registerWithRM() throws YarnException, IOException {
-    Configuration conf = getConfig();
-    rmConnectWaitMS =
-        conf.getInt(
-            YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
-            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
-        * 1000;
-    rmConnectionRetryIntervalMS =
-        conf.getLong(
-            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
-            YarnConfiguration
-                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
-        * 1000;
-
-    if(rmConnectionRetryIntervalMS < 0) {
-      throw new YarnRuntimeException("Invalid Configuration. " +
-          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
-          " should not be negative.");
-    }
-
-    waitForEver = (rmConnectWaitMS == -1000);
-
-    if(! waitForEver) {
-      if(rmConnectWaitMS < 0) {
-          throw new YarnRuntimeException("Invalid Configuration. " +
-              YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
-              " can be -1, but can not be other negative numbers");
-      }
-
-      //try connect once
-      if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
-        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
-            + " is smaller than "
-            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
-            + ". Only try connect once.");
-        rmConnectWaitMS = 0;
-      }
-    }
-
-    int rmRetryCount = 0;
-    long waitStartTime = System.currentTimeMillis();
-
     RegisterNodeManagerRequest request =
         recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
     request.setHttpPort(this.httpPort);
     request.setResource(this.totalResource);
     request.setNodeId(this.nodeId);
-    RegisterNodeManagerResponse regNMResponse;
-
-    while(true) {
-      try {
-        rmRetryCount++;
-        LOG.info("Connecting to ResourceManager at " + this.rmAddress
-            + ". current no. of attempts is " + rmRetryCount);
-        this.resourceTracker = getRMClient();
-        regNMResponse =
-            this.resourceTracker.registerNodeManager(request);
-        this.rmIdentifier = regNMResponse.getRMIdentifier();
-        break;
-      } catch(Throwable e) {
-        LOG.warn("Trying to connect to ResourceManager, " +
-            "current no. of failed attempts is "+rmRetryCount);
-        if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
-            || waitForEver) {
-          try {
-            LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
-                + " seconds before next connection retry to RM");
-            Thread.sleep(rmConnectionRetryIntervalMS);
-          } catch(InterruptedException ex) {
-            //done nothing
-          }
-        } else {
-          String errorMessage = "Failed to Connect to RM, " +
-              "no. of failed attempts is "+rmRetryCount;
-          LOG.error(errorMessage,e);
-          throw new YarnRuntimeException(errorMessage,e);
-        }
-      }
-    }
+    RegisterNodeManagerResponse regNMResponse =
+        resourceTracker.registerNodeManager(request);
+    this.rmIdentifier = regNMResponse.getRMIdentifier();
     // if the Resourcemanager instructs NM to shutdown.
     if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
       String message =
@@ -360,7 +316,11 @@ public class NodeStatusUpdaterImpl exten
       if (containerStatus.getState() == ContainerState.COMPLETE) {
         // Remove
         i.remove();
-
+        // Adding to finished containers cache. Cache will keep it around at
+        // least for #durationToTrackStoppedContainers duration. In the
+        // subsequent call to stop container it will get removed from cache.
+        addStoppedContainersToCache(containerId);
+        
         LOG.info("Removed completed container " + containerId);
       }
     }
@@ -410,6 +370,46 @@ public class NodeStatusUpdaterImpl exten
     }
   }
 
+  public boolean isContainerRecentlyStopped(ContainerId containerId) {
+    synchronized (recentlyStoppedContainers) {
+      return recentlyStoppedContainers.containsKey(containerId);
+    }
+  }
+  
+  @Private
+  @VisibleForTesting
+  public void addStoppedContainersToCache(ContainerId containerId) {
+    synchronized (recentlyStoppedContainers) {
+      removeVeryOldStoppedContainersFromCache();
+      recentlyStoppedContainers.put(containerId,
+        System.currentTimeMillis() + durationToTrackStoppedContainers);
+    }
+  }
+  
+  @Override
+  public void clearFinishedContainersFromCache() {
+    synchronized (recentlyStoppedContainers) {
+      recentlyStoppedContainers.clear();
+    }
+  }
+  
+  @Private
+  @VisibleForTesting
+  public void removeVeryOldStoppedContainersFromCache() {
+    synchronized (recentlyStoppedContainers) {
+      long currentTime = System.currentTimeMillis();
+      Iterator<ContainerId> i =
+          recentlyStoppedContainers.keySet().iterator();
+      while (i.hasNext()) {
+        if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+          i.remove();
+        } else {
+          break;
+        }
+      }
+    }
+  }
+  
   @Override
   public long getRMIdentifier() {
     return this.rmIdentifier;
@@ -426,8 +426,6 @@ public class NodeStatusUpdaterImpl exten
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            int rmRetryCount = 0;
-            long waitStartTime = System.currentTimeMillis();
             NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
             nodeStatus.setResponseId(lastHeartBeatID);
             
@@ -440,31 +438,7 @@ public class NodeStatusUpdaterImpl exten
             request
               .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
                 .getNMTokenSecretManager().getCurrentKey());
-            while (!isStopped) {
-              try {
-                rmRetryCount++;
-                response = resourceTracker.nodeHeartbeat(request);
-                break;
-              } catch (Throwable e) {
-                LOG.warn("Trying to heartbeat to ResourceManager, "
-                    + "current no. of failed attempts is " + rmRetryCount);
-                if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
-                    || waitForEver) {
-                  try {
-                    LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
-                        + " seconds before next heartbeat to RM");
-                    Thread.sleep(rmConnectionRetryIntervalMS);
-                  } catch(InterruptedException ex) {
-                    //done nothing
-                  }
-                } else {
-                  String errorMessage = "Failed to heartbeat to RM, " +
-                      "no. of failed attempts is "+rmRetryCount;
-                  LOG.error(errorMessage,e);
-                  throw new YarnRuntimeException(errorMessage,e);
-                }
-              }
-            }
+            response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
@@ -481,7 +455,7 @@ public class NodeStatusUpdaterImpl exten
             }
             if (response.getNodeAction() == NodeAction.RESYNC) {
               LOG.warn("Node is out of sync with ResourceManager,"
-                  + " hence rebooting.");
+                  + " hence resyncing.");
               LOG.warn("Message from ResourceManager: "
                   + response.getDiagnosticsMessage());
               // Invalidate the RMIdentifier while resync
@@ -508,12 +482,13 @@ public class NodeStatusUpdaterImpl exten
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
-          } catch (YarnRuntimeException e) {
+          } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(
                 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
-            throw e;
+            throw new YarnRuntimeException(e);
           } catch (Throwable e) {
+
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);
@@ -550,4 +525,6 @@ public class NodeStatusUpdaterImpl exten
         new Thread(statusUpdaterRunnable, "Node Status Updater");
     statusUpdater.start();
   }
+  
+  
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
 
 public interface LocalResourceStatus {
   public LocalResource getResource();

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java Mon Aug 12 21:25:49 2013
@@ -18,9 +18,11 @@
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
@@ -28,8 +30,6 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceStatusTypeProto;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.SerializedExceptionPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;