You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/09/27 22:36:21 UTC
svn commit: r1176604 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/t...
Author: mahadev
Date: Tue Sep 27 20:36:20 2011
New Revision: 1176604
URL: http://svn.apache.org/viewvc?rev=1176604&view=rev
Log:
MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev) - Merging r1176600 from trunk
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java
- copied unchanged from r1176600, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationRequest.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
- copied unchanged from r1176600, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java
- copied unchanged from r1176600, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationRequestPBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
- copied unchanged from r1176600, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
Removed:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationRequest.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationResponse.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationRequestPBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationResponsePBImpl.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Sep 27 20:36:20 2011
@@ -1420,6 +1420,8 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3095. fairscheduler ivy including wrong version for hdfs.
(John George via mahadev)
+ MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Tue Sep 27 20:36:20 2011
@@ -1,20 +1,20 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.mapred;
@@ -42,29 +42,29 @@ public class ClientCache {
private final Configuration conf;
private final ResourceMgrDelegate rm;
-
+
private static final Log LOG = LogFactory.getLog(ClientCache.class);
private Map<JobID, ClientServiceDelegate> cache =
- new HashMap<JobID, ClientServiceDelegate>();
-
+ new HashMap<JobID, ClientServiceDelegate>();
+
private MRClientProtocol hsProxy;
- ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf;
this.rm = rm;
}
//TODO: evict from the cache on some threshold
- synchronized ClientServiceDelegate getClient(JobID jobId) {
- if (hsProxy == null) {
+ public synchronized ClientServiceDelegate getClient(JobID jobId) {
+ if (hsProxy == null) {
try {
- hsProxy = instantiateHistoryProxy();
- } catch (IOException e) {
- LOG.warn("Could not connect to History server.", e);
- throw new YarnException("Could not connect to History server.", e);
- }
- }
+ hsProxy = instantiateHistoryProxy();
+ } catch (IOException e) {
+ LOG.warn("Could not connect to History server.", e);
+ throw new YarnException("Could not connect to History server.", e);
+ }
+ }
ClientServiceDelegate client = cache.get(jobId);
if (client == null) {
client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
@@ -74,7 +74,7 @@ public class ClientCache {
}
private MRClientProtocol instantiateHistoryProxy()
- throws IOException {
+ throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue Sep 27 20:36:20 2011
@@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
-class ClientServiceDelegate {
+public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
// Caches for per-user NotRunningJobs
@@ -87,7 +87,7 @@ class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
- ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
@@ -279,7 +279,7 @@ class ClientServiceDelegate {
}
}
- org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
@@ -290,7 +290,7 @@ class ClientServiceDelegate {
}
- TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
.toYarn(arg0);
@@ -308,7 +308,7 @@ class ClientServiceDelegate {
.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
- String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
@@ -326,7 +326,7 @@ class ClientServiceDelegate {
return result;
}
- JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
@@ -339,7 +339,7 @@ class ClientServiceDelegate {
return TypeConverter.fromYarn(report, jobFile);
}
- org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
@@ -356,7 +356,7 @@ class ClientServiceDelegate {
(taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
}
- boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
@@ -372,7 +372,7 @@ class ClientServiceDelegate {
return true;
}
- boolean killJob(JobID oldJobID)
+ public boolean killJob(JobID oldJobID)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Sep 27 20:36:20 2011
@@ -44,7 +44,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -79,6 +79,10 @@ public class ResourceMgrDelegate {
private ApplicationId applicationId;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+ * @param conf the configuration object.
+ */
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
@@ -97,6 +101,16 @@ public class ResourceMgrDelegate {
LOG.info("Connected to ResourceManager at " + rmAddress);
}
+ /**
+ * Used for injecting applicationsManager, mostly for testing.
+ * @param conf the configuration object
+ * @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) {
+ this.conf = conf;
+ this.applicationsManager = applicationsManager;
+ }
+
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
return;
@@ -294,9 +308,9 @@ public class ResourceMgrDelegate {
}
public void killApplication(ApplicationId applicationId) throws IOException {
- FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class);
+ KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class);
request.setApplicationId(applicationId);
- applicationsManager.finishApplication(request);
+ applicationsManager.forceKillApplication(request);
LOG.info("Killing application " + applicationId);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue Sep 27 20:36:20 2011
@@ -105,10 +105,22 @@ public class YARNRunner implements Clien
* @param resMgrDelegate the resourcemanager client handle.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
- this.clientCache = new ClientCache(this.conf, resMgrDelegate);
+ this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
@@ -429,9 +441,35 @@ public class YARNRunner implements Clien
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
- if (!clientCache.getClient(arg0).killJob(arg0)) {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- }
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+ if (status.getState() != JobStatus.State.RUNNING) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ return;
+ }
+
+ try {
+ /* send a kill to the AM */
+ clientCache.getClient(arg0).killJob(arg0);
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeKillIssued = currentTimeMillis;
+ while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+ != JobStatus.State.KILLED)) {
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ie) {
+ /** interrupted, just break */
+ break;
+ }
+ currentTimeMillis = System.currentTimeMillis();
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
+ }
+ } catch(IOException io) {
+ LOG.debug("Error when checking for application status", io);
+ }
+ if (status.getState() != JobStatus.State.KILLED) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ }
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Tue Sep 27 20:36:20 2011
@@ -68,8 +68,8 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -288,9 +288,9 @@ public class TestClientRedirect {
}
@Override
- public FinishApplicationResponse finishApplication(
- FinishApplicationRequest request) throws YarnRemoteException {
- return null;
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnRemoteException {
+ return recordFactory.newRecordInstance(KillApplicationResponse.class);
}
@Override
@@ -451,7 +451,7 @@ public class TestClientRedirect {
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
- return null;
+ return recordFactory.newRecordInstance(KillJobResponse.class);
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Tue Sep 27 20:36:20 2011
@@ -22,6 +22,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -36,15 +37,38 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClientCache;
+import org.apache.hadoop.mapred.ClientServiceDelegate;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -54,9 +78,8 @@ import org.mockito.invocation.Invocation
import org.mockito.stubbing.Answer;
/**
- * Test if the jobclient shows enough diagnostics
- * on a job failure.
- *
+ * Test YarnRunner and make sure the client side plugin works
+ * fine
*/
public class TestYARNRunner extends TestCase {
private static final Log LOG = LogFactory.getLog(TestYARNRunner.class);
@@ -65,18 +88,22 @@ public class TestYARNRunner extends Test
private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate;
private YarnConfiguration conf;
+ private ClientCache clientCache;
private ApplicationId appId;
private JobID jobId;
private File testWorkDir =
new File("target", TestYARNRunner.class.getName());
private ApplicationSubmissionContext submissionContext;
+ private ClientServiceDelegate clientDelegate;
private static final String failString = "Rejected job";
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
conf = new YarnConfiguration();
- yarnRunner = new YARNRunner(conf, resourceMgrDelegate);
+ clientCache = new ClientCache(conf, resourceMgrDelegate);
+ clientCache = spy(clientCache);
+ yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
yarnRunner = spy(yarnRunner);
submissionContext = mock(ApplicationSubmissionContext.class);
doAnswer(
@@ -102,6 +129,31 @@ public class TestYARNRunner extends Test
@Test
+ public void testJobKill() throws Exception {
+ clientDelegate = mock(ClientServiceDelegate.class);
+ when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
+ org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
+ State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
+ when(clientDelegate.killJob(any(JobID.class))).thenReturn(true);
+ doAnswer(
+ new Answer<ClientServiceDelegate>() {
+ @Override
+ public ClientServiceDelegate answer(InvocationOnMock invocation)
+ throws Throwable {
+ return clientDelegate;
+ }
+ }
+ ).when(clientCache).getClient(any(JobID.class));
+ yarnRunner.killJob(jobId);
+ verify(resourceMgrDelegate).killApplication(appId);
+ when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
+ org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
+ State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
+ yarnRunner.killJob(jobId);
+ verify(clientDelegate).killJob(jobId);
+ }
+
+ @Test
public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
thenReturn(appId);
@@ -122,4 +174,66 @@ public class TestYARNRunner extends Test
assertTrue(io.getLocalizedMessage().contains(failString));
}
}
+
+ @Test
+ public void testResourceMgrDelegate() throws Exception {
+ /* we not want a mock of resourcemgr deleagte */
+ ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+ ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+ /* make sure kill calls finish application master */
+ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
+ .thenReturn(null);
+ delegate.killApplication(appId);
+ verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
+
+ /* make sure getalljobs calls get all applications */
+ when(clientRMProtocol.getAllApplications(any(GetAllApplicationsRequest.class))).
+ thenReturn(recordFactory.newRecordInstance(GetAllApplicationsResponse.class));
+ delegate.getAllJobs();
+ verify(clientRMProtocol).getAllApplications(any(GetAllApplicationsRequest.class));
+
+ /* make sure getapplication report is called */
+ when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
+ .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
+ delegate.getApplicationReport(appId);
+ verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
+
+ /* make sure metrics is called */
+ GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
+ (GetClusterMetricsResponse.class);
+ clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
+ YarnClusterMetrics.class));
+ when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
+ .thenReturn(clusterMetricsResponse);
+ delegate.getClusterMetrics();
+ verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
+
+ when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))).
+ thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class));
+ delegate.getActiveTrackers();
+ verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class));
+
+ GetNewApplicationIdResponse newAppIdResponse = recordFactory.newRecordInstance(
+ GetNewApplicationIdResponse.class);
+ newAppIdResponse.setApplicationId(appId);
+ when(clientRMProtocol.getNewApplicationId(any(GetNewApplicationIdRequest.class))).
+ thenReturn(newAppIdResponse);
+ delegate.getNewJobID();
+ verify(clientRMProtocol).getNewApplicationId(any(GetNewApplicationIdRequest.class));
+
+ GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance(
+ GetQueueInfoResponse.class);
+ queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class));
+ when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))).
+ thenReturn(queueInfoResponse);
+ delegate.getQueues();
+ verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class));
+
+ GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance(
+ GetQueueUserAclsInfoResponse.class);
+ when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)))
+ .thenReturn(aclResponse);
+ delegate.getQueueAclsForCurrentUser();
+ verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java Tue Sep 27 20:36:20 2011
@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -102,7 +102,7 @@ public interface ClientRMProtocol {
* <p>The interface used by clients to request the
* <code>ResourceManager</code> to abort submitted application.</p>
*
- * <p>The client, via {@link FinishApplicationRequest} provides the
+ * <p>The client, via {@link KillApplicationRequest} provides the
* {@link ApplicationId} of the application to be aborted.</p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access to the
@@ -117,8 +117,8 @@ public interface ClientRMProtocol {
* @throws YarnRemoteException
* @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
*/
- public FinishApplicationResponse finishApplication(
- FinishApplicationRequest request)
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request)
throws YarnRemoteException;
/**
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto Tue Sep 27 20:36:20 2011
@@ -27,7 +27,7 @@ service ClientRMProtocolService {
rpc getNewApplicationId (GetNewApplicationIdRequestProto) returns (GetNewApplicationIdResponseProto);
rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
- rpc finishApplication (FinishApplicationRequestProto) returns (FinishApplicationResponseProto);
+ rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
rpc getAllApplications (GetAllApplicationsRequestProto) returns (GetAllApplicationsResponseProto);
rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Sep 27 20:36:20 2011
@@ -88,11 +88,11 @@ message SubmitApplicationRequestProto {
message SubmitApplicationResponseProto {
}
-message FinishApplicationRequestProto {
+message KillApplicationRequestProto {
optional ApplicationIdProto application_id = 1;
}
-message FinishApplicationResponseProto {
+message KillApplicationResponseProto {
}
message GetClusterMetricsRequestProto {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java Tue Sep 27 20:36:20 2011
@@ -25,8 +25,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -41,10 +39,10 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@@ -59,21 +57,22 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationIdRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto;
import com.google.protobuf.ServiceException;
@@ -88,11 +87,11 @@ public class ClientRMProtocolPBClientImp
}
@Override
- public FinishApplicationResponse finishApplication(
- FinishApplicationRequest request) throws YarnRemoteException {
- FinishApplicationRequestProto requestProto = ((FinishApplicationRequestPBImpl)request).getProto();
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnRemoteException {
+ KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto();
try {
- return new FinishApplicationResponsePBImpl(proxy.finishApplication(null, requestProto));
+ return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto));
} catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) {
throw (YarnRemoteException)e.getCause();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java Tue Sep 27 20:36:20 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@@ -27,9 +26,8 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@@ -44,12 +42,12 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@@ -64,6 +62,8 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@@ -79,12 +79,12 @@ public class ClientRMProtocolPBServiceIm
}
@Override
- public FinishApplicationResponseProto finishApplication(RpcController arg0,
- FinishApplicationRequestProto proto) throws ServiceException {
- FinishApplicationRequestPBImpl request = new FinishApplicationRequestPBImpl(proto);
+ public KillApplicationResponseProto forceKillApplication(RpcController arg0,
+ KillApplicationRequestProto proto) throws ServiceException {
+ KillApplicationRequestPBImpl request = new KillApplicationRequestPBImpl(proto);
try {
- FinishApplicationResponse response = real.finishApplication(request);
- return ((FinishApplicationResponsePBImpl)response).getProto();
+ KillApplicationResponse response = real.forceKillApplication(request);
+ return ((KillApplicationResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Tue Sep 27 20:36:20 2011
@@ -90,7 +90,7 @@ public class TestRPC {
.newRecord(GetNewApplicationIdRequest.class));
Assert.fail("Excepted RPC call to fail with unknown method.");
} catch (YarnRemoteException e) {
- Assert.assertEquals("Unknown method getNewApplicationId called on "
+ Assert.assertEquals("Unknown method getNewApplicationId called on interface "
+ "org.apache.hadoop.yarn.proto.ClientRMProtocol"
+ "$ClientRMProtocolService$BlockingInterface protocol.", e
.getMessage());
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Sep 27 20:36:20 2011
@@ -36,8 +36,8 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -228,8 +228,8 @@ public class ClientRMService extends Abs
@SuppressWarnings("unchecked")
@Override
- public FinishApplicationResponse finishApplication(
- FinishApplicationRequest request) throws YarnRemoteException {
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnRemoteException {
ApplicationId applicationId = request.getApplicationId();
@@ -262,8 +262,8 @@ public class ClientRMService extends Abs
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
- FinishApplicationResponse response = recordFactory
- .newRecordInstance(FinishApplicationResponse.class);
+ KillApplicationResponse response = recordFactory
+ .newRecordInstance(KillApplicationResponse.class);
return response;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Sep 27 20:36:20 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -86,7 +87,8 @@ public class RMAppImpl implements RMApp
private long startTime;
private long finishTime;
private RMAppAttempt currentAttempt;
-
+ @SuppressWarnings("rawtypes")
+ private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final StateMachineFactory<RMAppImpl,
@@ -99,9 +101,6 @@ public class RMAppImpl implements RMApp
RMAppEvent>(RMAppState.NEW)
- // TODO - ATTEMPT_KILLED not sent right now but should handle if
- // attempt starts sending
-
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition())
@@ -116,7 +115,7 @@ public class RMAppImpl implements RMApp
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
.addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
- RMAppEventType.KILL, new AppKilledTransition())
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
@@ -126,7 +125,7 @@ public class RMAppImpl implements RMApp
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
- RMAppEventType.KILL, new AppKilledTransition())
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
@@ -136,7 +135,7 @@ public class RMAppImpl implements RMApp
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
- RMAppEventType.KILL, new AppKilledTransition())
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from FINISHED state
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
@@ -168,6 +167,7 @@ public class RMAppImpl implements RMApp
this.name = name;
this.rmContext = rmContext;
this.dispatcher = rmContext.getDispatcher();
+ this.handler = dispatcher.getEventHandler();
this.conf = config;
this.user = user;
this.queue = queue;
@@ -403,7 +403,7 @@ public class RMAppImpl implements RMApp
submissionContext);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
- dispatcher.getEventHandler().handle(
+ handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
}
@@ -420,13 +420,23 @@ public class RMAppImpl implements RMApp
};
}
- private static final class AppKilledTransition extends FinalTransition {
+ private static class AppKilledTransition extends FinalTransition {
+ @Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append("Application killed by user.");
super.transition(app, event);
};
}
+ private static class KillAppAndAttemptTransition extends AppKilledTransition {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.KILL));
+ super.transition(app, event);
+ }
+ }
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -450,11 +460,11 @@ public class RMAppImpl implements RMApp
public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {
- app.dispatcher.getEventHandler().handle(
+ app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
app.finishTime = System.currentTimeMillis();
- app.dispatcher.getEventHandler().handle(
+ app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
};
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Sep 27 20:36:20 2011
@@ -22,7 +22,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -109,9 +109,9 @@ public class MockRM extends ResourceMana
public void killApp(ApplicationId appId) throws Exception {
ClientRMProtocol client = getClientRMService();
- FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class);
+ KillApplicationRequest req = Records.newRecord(KillApplicationRequest.class);
req.setApplicationId(appId);
- client.finishApplication(req);
+ client.forceKillApplication(req);
}
//from AMLauncher
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/InlineDispatcher.java Tue Sep 27 20:36:20 2011
@@ -1,52 +1,57 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-@Private
public class InlineDispatcher extends AsyncDispatcher {
- private class InlineEventHandler implements EventHandler {
- private final InlineDispatcher dispatcher;
- public InlineEventHandler(InlineDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
+ private static final Log LOG = LogFactory.getLog(InlineDispatcher.class);
+
+ private class TestEventHandler implements EventHandler {
@Override
public void handle(Event event) {
- this.dispatcher.dispatch(event);
+ dispatch(event);
}
}
- public void dispatch(Event event) {
- super.dispatch(event);
+ @Override
+ protected void dispatch(Event event) {
+ LOG.info("Dispatching the event " + event.getClass().getName() + "."
+ + event.toString());
+
+ Class<? extends Enum> type = event.getType().getDeclaringClass();
+ if (eventDispatchers.get(type) != null) {
+ eventDispatchers.get(type).handle(event);
+ }
}
@Override
public EventHandler getEventHandler() {
- return new InlineEventHandler(this);
+ return new TestEventHandler();
}
-
+
static class EmptyEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
- ; // ignore
- }
+ //do nothing
+ }
}
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1176604&r1=1176603&r2=1176604&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Sep 27 20:36:20 2011
@@ -1,26 +1,27 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.util.List;
import junit.framework.Assert;
@@ -37,12 +38,14 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -52,27 +55,40 @@ import org.junit.Test;
public class TestRMAppTransitions {
- private static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
-
+ static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
+
private RMContext rmContext;
private static int maxRetries = 4;
private static int appId = 1;
+ private AsyncDispatcher rmDispatcher;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
- EventHandler<RMAppAttemptEvent> {
+ EventHandler<RMAppAttemptEvent> {
- public TestApplicationAttemptEventDispatcher() {
+ private final RMContext rmContext;
+ public TestApplicationAttemptEventDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
}
@Override
public void handle(RMAppAttemptEvent event) {
+ ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = this.rmContext.getRMApps().get(appId);
+ if (rmApp != null) {
+ try {
+ rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appId, t);
+ }
+ }
}
}
// handle all the RM application events - same as in ResourceManager.java
private static final class TestApplicationEventDispatcher implements
- EventHandler<RMAppEvent> {
+ EventHandler<RMAppEvent> {
private final RMContext rmContext;
public TestApplicationEventDispatcher(RMContext rmContext) {
@@ -97,18 +113,22 @@ public class TestRMAppTransitions {
@Before
public void setUp() throws Exception {
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+ Configuration conf = new Configuration();
+ rmDispatcher = new InlineDispatcher();
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
- containerAllocationExpirer, amLivelinessMonitor);
+ containerAllocationExpirer, amLivelinessMonitor);
rmDispatcher.register(RMAppAttemptEventType.class,
- new TestApplicationAttemptEventDispatcher());
+ new TestApplicationAttemptEventDispatcher(this.rmContext));
rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher(rmContext));
+ rmDispatcher.init(conf);
+ rmDispatcher.start();
}
protected RMApp createNewTestApp() {
@@ -128,10 +148,10 @@ public class TestRMAppTransitions {
new ApplicationTokenSecretManager(), scheduler);
RMApp application = new RMAppImpl(applicationId, rmContext,
- conf, name, user,
- queue, submissionContext, clientTokenStr,
- appStore, scheduler,
- masterService);
+ conf, name, user,
+ queue, submissionContext, clientTokenStr,
+ appStore, scheduler,
+ masterService);
testAppStartState(applicationId, user, name, queue, application);
return application;
@@ -193,6 +213,14 @@ public class TestRMAppTransitions {
"Application killed by user.", diag.toString());
}
+ private static void assertAppAndAttemptKilled(RMApp application) {
+ assertKilled(application);
+ /* also check if the attempt is killed */
+ Assert.assertEquals( RMAppAttemptState.KILLED,
+ application.getCurrentAppAttempt().getAppAttemptState()
+ );
+ }
+
private static void assertFailed(RMApp application, String regex) {
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
@@ -298,10 +326,10 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppAccepted();
// SUBMITTED => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
application.handle(event);
- assertKilled(application);
+ assertAppAndAttemptKilled(application);
}
@Test