You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/09/27 22:30:26 UTC

svn commit: r1176600 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/ap...

Author: mahadev
Date: Tue Sep 27 20:30:24 2011
New Revision: 1176600

URL: http://svn.apache.org/viewvc?rev=1176600&view=rev
Log:
MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
Removed:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationRequest.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationResponse.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationRequestPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationResponsePBImpl.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Sep 27 20:30:24 2011
@@ -1452,6 +1452,8 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3095. fairscheduler ivy including wrong version for hdfs.
     (John George via mahadev)
 
+    MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Tue Sep 27 20:30:24 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapred;
 
@@ -42,29 +42,29 @@ public class ClientCache {
 
   private final Configuration conf;
   private final ResourceMgrDelegate rm;
-  
+
   private static final Log LOG = LogFactory.getLog(ClientCache.class);
 
   private Map<JobID, ClientServiceDelegate> cache = 
-    new HashMap<JobID, ClientServiceDelegate>();
-  
+      new HashMap<JobID, ClientServiceDelegate>();
+
   private MRClientProtocol hsProxy;
 
-  ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+  public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
     this.conf = conf;
     this.rm = rm;
   }
 
   //TODO: evict from the cache on some threshold
-  synchronized ClientServiceDelegate getClient(JobID jobId) {
-	if (hsProxy == null) {
+  public synchronized ClientServiceDelegate getClient(JobID jobId) {
+    if (hsProxy == null) {
       try {
-		hsProxy = instantiateHistoryProxy();
-	  } catch (IOException e) {
-		LOG.warn("Could not connect to History server.", e);
-		throw new YarnException("Could not connect to History server.", e);
-	  }
-	}
+        hsProxy = instantiateHistoryProxy();
+      } catch (IOException e) {
+        LOG.warn("Could not connect to History server.", e);
+        throw new YarnException("Could not connect to History server.", e);
+      }
+    }
     ClientServiceDelegate client = cache.get(jobId);
     if (client == null) {
       client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
@@ -74,7 +74,7 @@ public class ClientCache {
   }
 
   private MRClientProtocol instantiateHistoryProxy()
-  throws IOException {
+      throws IOException {
     final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
     if (StringUtils.isEmpty(serviceAddr)) {
       return null;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue Sep 27 20:30:24 2011
@@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 
-class ClientServiceDelegate {
+public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
 
   // Caches for per-user NotRunningJobs
@@ -87,7 +87,7 @@ class ClientServiceDelegate {
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private static String UNKNOWN_USER = "Unknown User";
 
-  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, 
+  public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, 
       JobID jobId, MRClientProtocol historyServerProxy) {
     this.conf = new Configuration(conf); // Cloning for modifying.
     // For faster redirects from AM to HS.
@@ -279,7 +279,7 @@ class ClientServiceDelegate {
     }
   }
 
-  org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
   InterruptedException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
       GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
@@ -290,7 +290,7 @@ class ClientServiceDelegate {
       
   }
 
-  TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
       throws IOException, InterruptedException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
         .toYarn(arg0);
@@ -308,7 +308,7 @@ class ClientServiceDelegate {
             .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
   }
 
-  String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
       throws IOException, InterruptedException {
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
@@ -326,7 +326,7 @@ class ClientServiceDelegate {
     return result;
   }
   
-  JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+  public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = 
       TypeConverter.toYarn(oldJobID);
     GetJobReportRequest request = 
@@ -339,7 +339,7 @@ class ClientServiceDelegate {
     return TypeConverter.fromYarn(report, jobFile);
   }
 
-  org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
        throws YarnRemoteException, YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = 
       TypeConverter.toYarn(oldJobID);
@@ -356,7 +356,7 @@ class ClientServiceDelegate {
     (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
   }
 
-  boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
        throws YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID 
       = TypeConverter.toYarn(taskAttemptID);
@@ -372,7 +372,7 @@ class ClientServiceDelegate {
     return true;
   }
   
-  boolean killJob(JobID oldJobID)
+  public boolean killJob(JobID oldJobID)
        throws YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId 
     = TypeConverter.toYarn(oldJobID);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Sep 27 20:30:24 2011
@@ -44,7 +44,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -79,6 +79,10 @@ public class ResourceMgrDelegate {
   private ApplicationId applicationId;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
+  /**
+   * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+   * @param conf the configuration object.
+   */
   public ResourceMgrDelegate(YarnConfiguration conf) {
     this.conf = conf;
     YarnRPC rpc = YarnRPC.create(this.conf);
@@ -97,6 +101,16 @@ public class ResourceMgrDelegate {
     LOG.info("Connected to ResourceManager at " + rmAddress);
   }
   
+  /**
+   * Used for injecting applicationsManager, mostly for testing.
+   * @param conf the configuration object
+   * @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}.
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) {
+    this.conf = conf;
+    this.applicationsManager = applicationsManager;
+  }
+  
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
       throws IOException, InterruptedException {
     return;
@@ -294,9 +308,9 @@ public class ResourceMgrDelegate {
   }
   
   public void killApplication(ApplicationId applicationId) throws IOException {
-    FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class);
+    KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class);
     request.setApplicationId(applicationId);
-    applicationsManager.finishApplication(request);
+    applicationsManager.forceKillApplication(request);
     LOG.info("Killing application " + applicationId);
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue Sep 27 20:30:24 2011
@@ -105,10 +105,22 @@ public class YARNRunner implements Clien
    * @param resMgrDelegate the resourcemanager client handle.
    */
   public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+  }
+  
+  /**
+   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} 
+   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+   * @param conf the configuration object
+   * @param resMgrDelegate the resource manager delegate 
+   * @param clientCache the client cache object.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+      ClientCache clientCache) {
     this.conf = conf;
     try {
       this.resMgrDelegate = resMgrDelegate;
-      this.clientCache = new ClientCache(this.conf, resMgrDelegate);
+      this.clientCache = clientCache;
       this.defaultFileContext = FileContext.getFileContext(this.conf);
     } catch (UnsupportedFileSystemException ufe) {
       throw new RuntimeException("Error in instantiating YarnClient", ufe);
@@ -429,9 +441,35 @@ public class YARNRunner implements Clien
 
   @Override
   public void killJob(JobID arg0) throws IOException, InterruptedException {
-    if (!clientCache.getClient(arg0).killJob(arg0)) {
-    resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
-  }
+    /* check if the status is not running, if not send kill to RM */
+    JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+    if (status.getState() != JobStatus.State.RUNNING) {
+      resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+      return;
+    } 
+    
+    try {
+      /* send a kill to the AM */
+      clientCache.getClient(arg0).killJob(arg0);
+      long currentTimeMillis = System.currentTimeMillis();
+      long timeKillIssued = currentTimeMillis;
+      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+          != JobStatus.State.KILLED)) {
+          try {
+            Thread.sleep(1000L);
+          } catch(InterruptedException ie) {
+            /** interrupted, just break */
+            break;
+          }
+          currentTimeMillis = System.currentTimeMillis();
+          status = clientCache.getClient(arg0).getJobStatus(arg0);
+      }
+    } catch(IOException io) {
+      LOG.debug("Error when checking for application status", io);
+    }
+    if (status.getState() != JobStatus.State.KILLED) {
+      resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+    }
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Tue Sep 27 20:30:24 2011
@@ -68,8 +68,8 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -288,9 +288,9 @@ public class TestClientRedirect {
     }
 
     @Override
-    public FinishApplicationResponse finishApplication(
-        FinishApplicationRequest request) throws YarnRemoteException {
-      return null;
+    public KillApplicationResponse forceKillApplication(
+        KillApplicationRequest request) throws YarnRemoteException {
+      return recordFactory.newRecordInstance(KillApplicationResponse.class);
     }
 
     @Override
@@ -451,7 +451,7 @@ public class TestClientRedirect {
     @Override
     public KillJobResponse killJob(KillJobRequest request)
         throws YarnRemoteException {
-      return null;
+      return recordFactory.newRecordInstance(KillJobResponse.class);
     }
 
     @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Tue Sep 27 20:30:24 2011
@@ -22,6 +22,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -36,15 +37,38 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClientCache;
+import org.apache.hadoop.mapred.ClientServiceDelegate;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.ResourceMgrDelegate;
 import org.apache.hadoop.mapred.YARNRunner;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -54,9 +78,8 @@ import org.mockito.invocation.Invocation
 import org.mockito.stubbing.Answer;
 
 /**
- * Test if the jobclient shows enough diagnostics 
- * on a job failure.
- *
+ * Test YarnRunner and make sure the client side plugin works 
+ * fine
  */
 public class TestYARNRunner extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestYARNRunner.class);
@@ -65,18 +88,22 @@ public class TestYARNRunner extends Test
   private YARNRunner yarnRunner;
   private ResourceMgrDelegate resourceMgrDelegate;
   private YarnConfiguration conf;
+  private ClientCache clientCache;
   private ApplicationId appId;
   private JobID jobId;
   private File testWorkDir = 
       new File("target", TestYARNRunner.class.getName());
   private ApplicationSubmissionContext submissionContext;
+  private  ClientServiceDelegate clientDelegate;
   private static final String failString = "Rejected job";
  
   @Before
   public void setUp() throws Exception {
     resourceMgrDelegate = mock(ResourceMgrDelegate.class);
     conf = new YarnConfiguration();
-    yarnRunner = new YARNRunner(conf, resourceMgrDelegate);
+    clientCache = new ClientCache(conf, resourceMgrDelegate);
+    clientCache = spy(clientCache);
+    yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
     yarnRunner = spy(yarnRunner);
     submissionContext = mock(ApplicationSubmissionContext.class);
     doAnswer(
@@ -102,6 +129,31 @@ public class TestYARNRunner extends Test
   
   
   @Test
+  public void testJobKill() throws Exception {
+    clientDelegate = mock(ClientServiceDelegate.class);
+    when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new 
+        org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, 
+            State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
+    when(clientDelegate.killJob(any(JobID.class))).thenReturn(true);
+    doAnswer(
+        new Answer<ClientServiceDelegate>() {
+          @Override
+          public ClientServiceDelegate answer(InvocationOnMock invocation)
+              throws Throwable {
+            return clientDelegate;
+          }
+        }
+        ).when(clientCache).getClient(any(JobID.class));
+    yarnRunner.killJob(jobId);
+    verify(resourceMgrDelegate).killApplication(appId);
+    when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new 
+        org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, 
+            State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
+    yarnRunner.killJob(jobId);
+    verify(clientDelegate).killJob(jobId);
+  }
+  
+  @Test
   public void testJobSubmissionFailure() throws Exception {
     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
     thenReturn(appId);
@@ -122,4 +174,66 @@ public class TestYARNRunner extends Test
       assertTrue(io.getLocalizedMessage().contains(failString));
     }
   }
+  
+  @Test
+  public void testResourceMgrDelegate() throws Exception {
+    /* we not want a mock of resourcemgr deleagte */
+    ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+    ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+    /* make sure kill calls finish application master */
+    when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
+    .thenReturn(null);
+    delegate.killApplication(appId);
+    verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
+    
+    /* make sure getalljobs calls get all applications */
+    when(clientRMProtocol.getAllApplications(any(GetAllApplicationsRequest.class))).
+    thenReturn(recordFactory.newRecordInstance(GetAllApplicationsResponse.class));
+    delegate.getAllJobs();
+    verify(clientRMProtocol).getAllApplications(any(GetAllApplicationsRequest.class));
+    
+    /* make sure getapplication report is called */
+    when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
+    .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
+    delegate.getApplicationReport(appId);
+    verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
+    
+    /* make sure metrics is called */
+    GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
+        (GetClusterMetricsResponse.class);
+    clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
+        YarnClusterMetrics.class));
+    when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
+    .thenReturn(clusterMetricsResponse);
+    delegate.getClusterMetrics();
+    verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
+    
+    when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))).
+    thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class));
+    delegate.getActiveTrackers();
+    verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class));
+    
+    GetNewApplicationIdResponse newAppIdResponse = recordFactory.newRecordInstance(
+        GetNewApplicationIdResponse.class);
+    newAppIdResponse.setApplicationId(appId);
+    when(clientRMProtocol.getNewApplicationId(any(GetNewApplicationIdRequest.class))).
+    thenReturn(newAppIdResponse);
+    delegate.getNewJobID();
+    verify(clientRMProtocol).getNewApplicationId(any(GetNewApplicationIdRequest.class));
+    
+    GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance(
+        GetQueueInfoResponse.class);
+    queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class));
+    when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))).
+    thenReturn(queueInfoResponse);
+    delegate.getQueues();
+    verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class));
+    
+    GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance(
+        GetQueueUserAclsInfoResponse.class);
+    when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)))
+    .thenReturn(aclResponse);
+    delegate.getQueueAclsForCurrentUser();
+    verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java Tue Sep 27 20:30:24 2011
@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.api;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -102,7 +102,7 @@ public interface ClientRMProtocol {
    * <p>The interface used by clients to request the 
    * <code>ResourceManager</code> to abort submitted application.</p>
    * 
-   * <p>The client, via {@link FinishApplicationRequest} provides the
+   * <p>The client, via {@link KillApplicationRequest} provides the
    * {@link ApplicationId} of the application to be aborted.</p>
    * 
    * <p> In secure mode,the <code>ResourceManager</code> verifies access to the
@@ -117,8 +117,8 @@ public interface ClientRMProtocol {
    * @throws YarnRemoteException
    * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) 
    */
-  public FinishApplicationResponse finishApplication(
-      FinishApplicationRequest request) 
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) 
   throws YarnRemoteException;
 
   /**

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java?rev=1176600&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java Tue Sep 27 20:30:24 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * <p>The request sent by the client to the <code>ResourceManager</code>
+ * to abort a submitted application.</p>
+ * 
+ * <p>The request includes the {@link ApplicationId} of the application to be
+ * aborted.</p>
+ * 
+ * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest)
+ */
+@Public
+@Stable
+public interface KillApplicationRequest {
+  /**
+   * Get the <code>ApplicationId</code> of the application to be aborted.
+   * @return <code>ApplicationId</code> of the application to be aborted
+   */
+  @Public
+  @Stable
+  public abstract ApplicationId getApplicationId();
+  
+  @Private
+  @Unstable
+  public abstract void setApplicationId(ApplicationId applicationId);
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java?rev=1176600&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java Tue Sep 27 20:30:24 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+
+/**
+ * <p>The response sent by the <code>ResourceManager</code> to the client
+ * aborting a submitted application.</p>
+ *
+ * <p>Currently it's empty.</p>
+ * 
+ * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest)
+ */
+@Public
+@Stable
+public interface KillApplicationResponse {
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java?rev=1176600&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java Tue Sep 27 20:30:24 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProtoOrBuilder;
+
+
+    
+public class KillApplicationRequestPBImpl extends ProtoBase<KillApplicationRequestProto> implements KillApplicationRequest {
+  KillApplicationRequestProto proto = KillApplicationRequestProto.getDefaultInstance();
+  KillApplicationRequestProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationId applicationId = null;
+  
+  
+  public KillApplicationRequestPBImpl() {
+    builder = KillApplicationRequestProto.newBuilder();
+  }
+
+  public KillApplicationRequestPBImpl(KillApplicationRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public KillApplicationRequestProto getProto() {
+      mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.applicationId != null) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) 
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = KillApplicationRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+    
+  
+  @Override
+  public ApplicationId getApplicationId() {
+    KillApplicationRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId applicationId) {
+    maybeInitBuilder();
+    if (applicationId == null) 
+      builder.clearApplicationId();
+    this.applicationId = applicationId;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl)t).getProto();
+  }
+
+
+
+}  

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java?rev=1176600&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java Tue Sep 27 20:30:24 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+
+
+    
+public class KillApplicationResponsePBImpl extends ProtoBase<KillApplicationResponseProto> implements KillApplicationResponse {
+  KillApplicationResponseProto proto = KillApplicationResponseProto.getDefaultInstance();
+  KillApplicationResponseProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  public KillApplicationResponsePBImpl() {
+    builder = KillApplicationResponseProto.newBuilder();
+  }
+
+  public KillApplicationResponsePBImpl(KillApplicationResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public KillApplicationResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = KillApplicationResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+    
+  
+
+
+
+}  

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto Tue Sep 27 20:30:24 2011
@@ -27,7 +27,7 @@ service ClientRMProtocolService {
   rpc getNewApplicationId (GetNewApplicationIdRequestProto) returns (GetNewApplicationIdResponseProto);
   rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
   rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
-  rpc finishApplication (FinishApplicationRequestProto) returns (FinishApplicationResponseProto);
+  rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
   rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
   rpc getAllApplications (GetAllApplicationsRequestProto) returns (GetAllApplicationsResponseProto);
   rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Sep 27 20:30:24 2011
@@ -88,11 +88,11 @@ message SubmitApplicationRequestProto {
 message SubmitApplicationResponseProto {
 }
 
-message FinishApplicationRequestProto {
+message KillApplicationRequestProto {
   optional ApplicationIdProto application_id = 1;
 }
 
-message FinishApplicationResponseProto {
+message KillApplicationResponseProto {
 }
 
 message GetClusterMetricsRequestProto {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java Tue Sep 27 20:30:24 2011
@@ -25,8 +25,6 @@ import java.net.InetSocketAddress;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -41,10 +39,10 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@@ -59,21 +57,22 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
 import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationIdRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto;
 
 import com.google.protobuf.ServiceException;
 
@@ -88,11 +87,11 @@ public class ClientRMProtocolPBClientImp
   }
   
   @Override
-  public FinishApplicationResponse finishApplication(
-      FinishApplicationRequest request) throws YarnRemoteException {
-    FinishApplicationRequestProto requestProto = ((FinishApplicationRequestPBImpl)request).getProto();
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnRemoteException {
+    KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto();
     try {
-      return new FinishApplicationResponsePBImpl(proxy.finishApplication(null, requestProto));
+      return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto));
     } catch (ServiceException e) {
       if (e.getCause() instanceof YarnRemoteException) {
         throw (YarnRemoteException)e.getCause();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java Tue Sep 27 20:30:24 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.api.impl.pb.service;
 
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@@ -27,9 +26,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@@ -44,12 +42,12 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@@ -64,6 +62,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
 
@@ -79,12 +79,12 @@ public class ClientRMProtocolPBServiceIm
   }
   
   @Override
-  public FinishApplicationResponseProto finishApplication(RpcController arg0,
-      FinishApplicationRequestProto proto) throws ServiceException {
-    FinishApplicationRequestPBImpl request = new FinishApplicationRequestPBImpl(proto);
+  public KillApplicationResponseProto forceKillApplication(RpcController arg0,
+      KillApplicationRequestProto proto) throws ServiceException {
+    KillApplicationRequestPBImpl request = new KillApplicationRequestPBImpl(proto);
     try {
-      FinishApplicationResponse response = real.finishApplication(request);
-      return ((FinishApplicationResponsePBImpl)response).getProto();
+      KillApplicationResponse response = real.forceKillApplication(request);
+      return ((KillApplicationResponsePBImpl)response).getProto();
     } catch (YarnRemoteException e) {
       throw new ServiceException(e);
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Sep 27 20:30:24 2011
@@ -36,8 +36,8 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -228,8 +228,8 @@ public class ClientRMService extends Abs
 
   @SuppressWarnings("unchecked")
   @Override
-  public FinishApplicationResponse finishApplication(
-      FinishApplicationRequest request) throws YarnRemoteException {
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnRemoteException {
 
     ApplicationId applicationId = request.getApplicationId();
 
@@ -262,8 +262,8 @@ public class ClientRMService extends Abs
 
     RMAuditLogger.logSuccess(callerUGI.getShortUserName(), 
         AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
-    FinishApplicationResponse response = recordFactory
-        .newRecordInstance(FinishApplicationResponse.class);
+    KillApplicationResponse response = recordFactory
+        .newRecordInstance(KillApplicationResponse.class);
     return response;
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Sep 27 20:30:24 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -86,7 +87,8 @@ public class RMAppImpl implements RMApp 
   private long startTime;
   private long finishTime;
   private RMAppAttempt currentAttempt;
-
+  @SuppressWarnings("rawtypes")
+  private EventHandler handler;
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
 
   private static final StateMachineFactory<RMAppImpl,
@@ -99,9 +101,6 @@ public class RMAppImpl implements RMApp 
                                            RMAppEvent>(RMAppState.NEW)
 
 
-     // TODO - ATTEMPT_KILLED not sent right now but should handle if 
-     // attempt starts sending
-
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
         RMAppEventType.START, new StartAppAttemptTransition())
@@ -116,7 +115,7 @@ public class RMAppImpl implements RMApp 
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
         RMAppEventType.APP_ACCEPTED)
     .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
@@ -126,7 +125,7 @@ public class RMAppImpl implements RMApp 
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
     .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
@@ -136,7 +135,7 @@ public class RMAppImpl implements RMApp 
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.SUBMITTED))
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
      // Transitions from FINISHED state
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
@@ -168,6 +167,7 @@ public class RMAppImpl implements RMApp 
     this.name = name;
     this.rmContext = rmContext;
     this.dispatcher = rmContext.getDispatcher();
+    this.handler = dispatcher.getEventHandler();
     this.conf = config;
     this.user = user;
     this.queue = queue;
@@ -403,7 +403,7 @@ public class RMAppImpl implements RMApp 
         submissionContext);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
-    dispatcher.getEventHandler().handle(
+    handler.handle(
         new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
   }
 
@@ -420,13 +420,23 @@ public class RMAppImpl implements RMApp 
     };
   }
 
-  private static final class AppKilledTransition extends FinalTransition {
+  private static class AppKilledTransition extends FinalTransition {
+    @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       app.diagnostics.append("Application killed by user.");
       super.transition(app, event);
     };
   }
 
+  private static class KillAppAndAttemptTransition extends AppKilledTransition {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), 
+          RMAppAttemptEventType.KILL));
+      super.transition(app, event);
+    }
+  }
   private static final class AppRejectedTransition extends
       FinalTransition{
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -450,11 +460,11 @@ public class RMAppImpl implements RMApp 
     public void transition(RMAppImpl app, RMAppEvent event) {
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       for (NodeId nodeId : nodes) {
-        app.dispatcher.getEventHandler().handle(
+        app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
       app.finishTime = System.currentTimeMillis();
-      app.dispatcher.getEventHandler().handle(
+      app.handler.handle(
           new RMAppManagerEvent(app.applicationId, 
           RMAppManagerEventType.APP_COMPLETED));
     };

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Sep 27 20:30:24 2011
@@ -22,7 +22,7 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -109,9 +109,9 @@ public class MockRM extends ResourceMana
 
   public void killApp(ApplicationId appId) throws Exception {
     ClientRMProtocol client = getClientRMService();
-    FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class);
+    KillApplicationRequest req = Records.newRecord(KillApplicationRequest.class);
     req.setApplicationId(appId);
-    client.finishApplication(req);
+    client.forceKillApplication(req);
   }
 
   //from AMLauncher

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java Tue Sep 27 20:30:24 2011
@@ -1,52 +1,57 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 
-@Private
 public class InlineDispatcher extends AsyncDispatcher {
-  private class InlineEventHandler implements EventHandler {
-    private final InlineDispatcher dispatcher;
-    public InlineEventHandler(InlineDispatcher dispatcher) {
-      this.dispatcher = dispatcher;
-    }
+  private static final Log LOG = LogFactory.getLog(InlineDispatcher.class);
+
+  private class TestEventHandler implements EventHandler {
     @Override
     public void handle(Event event) {
-      this.dispatcher.dispatch(event);
+      dispatch(event);
     }
   }
-  public void dispatch(Event event) {
-    super.dispatch(event);
+  @Override
+  protected void dispatch(Event event) {
+      LOG.info("Dispatching the event " + event.getClass().getName() + "."
+        + event.toString());
+
+    Class<? extends Enum> type = event.getType().getDeclaringClass();
+    if (eventDispatchers.get(type) != null) {
+      eventDispatchers.get(type).handle(event);
+    }
   }
   @Override
   public EventHandler getEventHandler() {
-    return new InlineEventHandler(this);
+    return new TestEventHandler();
   }
-
+  
   static class EmptyEventHandler implements EventHandler<Event> {
     @Override
     public void handle(Event event) {
-      ; // ignore
-    }
+      //do nothing      
+    }    
   }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1176600&r1=1176599&r2=1176600&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Sep 27 20:30:24 2011
@@ -1,26 +1,27 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -37,12 +38,14 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 
@@ -52,27 +55,40 @@ import org.junit.Test;
 
 
 public class TestRMAppTransitions {
-  private static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
-  
+  static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
+
   private RMContext rmContext;
   private static int maxRetries = 4;
   private static int appId = 1;
+  private AsyncDispatcher rmDispatcher;
 
   // ignore all the RM application attempt events
   private static final class TestApplicationAttemptEventDispatcher implements
-      EventHandler<RMAppAttemptEvent> {
+  EventHandler<RMAppAttemptEvent> {
 
-    public TestApplicationAttemptEventDispatcher() {
+    private final RMContext rmContext;
+    public  TestApplicationAttemptEventDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
     }
 
     @Override
     public void handle(RMAppAttemptEvent event) {
+      ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
+      RMApp rmApp = this.rmContext.getRMApps().get(appId);
+      if (rmApp != null) {
+        try {
+          rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType()
+              + " for application " + appId, t);
+        }    
+      }
     }
   }
 
   // handle all the RM application events - same as in ResourceManager.java
   private static final class TestApplicationEventDispatcher implements
-      EventHandler<RMAppEvent> {
+  EventHandler<RMAppEvent> {
 
     private final RMContext rmContext;
     public TestApplicationEventDispatcher(RMContext rmContext) {
@@ -97,18 +113,22 @@ public class TestRMAppTransitions {
   @Before
   public void setUp() throws Exception {
     AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    Configuration conf = new Configuration();
+    rmDispatcher = new InlineDispatcher();
 
     ContainerAllocationExpirer containerAllocationExpirer = 
         mock(ContainerAllocationExpirer.class);
     AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
-      containerAllocationExpirer, amLivelinessMonitor);
+        containerAllocationExpirer, amLivelinessMonitor);
 
     rmDispatcher.register(RMAppAttemptEventType.class,
-        new TestApplicationAttemptEventDispatcher());
+        new TestApplicationAttemptEventDispatcher(this.rmContext));
 
     rmDispatcher.register(RMAppEventType.class,
         new TestApplicationEventDispatcher(rmContext));
+    rmDispatcher.init(conf);
+    rmDispatcher.start();
   }
 
   protected RMApp createNewTestApp() {
@@ -128,10 +148,10 @@ public class TestRMAppTransitions {
             new ApplicationTokenSecretManager(), scheduler);
 
     RMApp application = new RMAppImpl(applicationId, rmContext,
-          conf, name, user,
-          queue, submissionContext, clientTokenStr,
-          appStore, scheduler,
-          masterService);
+        conf, name, user,
+        queue, submissionContext, clientTokenStr,
+        appStore, scheduler,
+        masterService);
 
     testAppStartState(applicationId, user, name, queue, application);
     return application;
@@ -193,6 +213,14 @@ public class TestRMAppTransitions {
         "Application killed by user.", diag.toString());
   }
 
+  private static void assertAppAndAttemptKilled(RMApp application) {
+    assertKilled(application);
+    /* also check if the attempt is killed */
+    Assert.assertEquals( RMAppAttemptState.KILLED, 
+        application.getCurrentAppAttempt().getAppAttemptState() 
+        );
+  }
+
   private static void assertFailed(RMApp application, String regex) {
     assertTimesAtFinish(application);
     assertAppState(RMAppState.FAILED, application);
@@ -298,10 +326,10 @@ public class TestRMAppTransitions {
 
     RMApp application = testCreateAppAccepted();
     // SUBMITTED => KILLED event RMAppEventType.KILL 
-    RMAppEvent event = 
-        new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+    RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+    this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
     application.handle(event);
-    assertKilled(application);
+    assertAppAndAttemptKilled(application);
   }
 
   @Test