You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/09 09:48:06 UTC
svn commit: r1100907 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt
mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Author: sharad
Date: Mon May 9 07:48:06 2011
New Revision: 1100907
URL: http://svn.apache.org/viewvc?rev=1100907&view=rev
Log:
Client reconnect to restarted AM.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1100907&r1=1100906&r2=1100907&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 9 07:48:06 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ Client reconnect to restarted AM. (sharad)
Replacing FileContext usage with FileSystem to work around security authentication
issues with FileContext against a secure DFS. (vinodkv)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1100907&r1=1100906&r2=1100907&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Mon May 9 07:48:06 2011
@@ -94,6 +94,44 @@ public class ClientServiceDelegate {
private void refreshProxy() throws YarnRemoteException {
ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
+ while (!ApplicationState.COMPLETED.equals(appMaster.getState()) ||
+ !ApplicationState.FAILED.equals(appMaster.getState()) ||
+ !ApplicationState.KILLED.equals(appMaster.getState())) {
+ try {
+ if (appMaster.getHost() == null || "".equals(appMaster.getHost())) {
+ LOG.info("AM not assigned to Job. Waiting to get the AM ...");
+ Thread.sleep(2000);
+ appMaster = rm.getApplicationMaster(currentAppId);
+ continue;
+ }
+ serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
+ serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String clientTokenEncoded = appMaster.getClientToken();
+ Token<ApplicationTokenIdentifier> clientToken =
+ new Token<ApplicationTokenIdentifier>();
+ clientToken.decodeFromUrlString(clientTokenEncoded);
+ clientToken.setService(new Text(appMaster.getHost() + ":"
+ + appMaster.getRpcPort()));
+ UserGroupInformation.getCurrentUser().addToken(clientToken);
+ }
+ LOG.info("Connecting to " + serviceAddr);
+ instantiateProxy(serviceAddr);
+ return;
+ } catch (Exception e) {
+ //possibly
+ //possibly the AM has crashed
+ //there may be some time before AM is restarted
+ //keep retrying by getting the address from RM
+ LOG.info("Could not connect to " + serviceAddr +
+ ". Waiting for getting the latest AM address...");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ }
+ appMaster = rm.getApplicationMaster(currentAppId);
+ }
+ }
if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
@@ -101,32 +139,16 @@ public class ClientServiceDelegate {
"Redirecting to job history server " + serviceAddr);
//TODO:
serviceHttpAddr = "";
- } else if (ApplicationState.RUNNING.equals(appMaster.getState())){
- serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
- serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
- if (UserGroupInformation.isSecurityEnabled()) {
- String clientTokenEncoded = appMaster.getClientToken();
- Token<ApplicationTokenIdentifier> clientToken =
- new Token<ApplicationTokenIdentifier>();
- try {
- clientToken.decodeFromUrlString(clientTokenEncoded);
- clientToken.setService(new Text(appMaster.getHost() + ":"
- + appMaster.getRpcPort()));
- UserGroupInformation.getCurrentUser().addToken(clientToken);
- } catch (IOException e) {
- throw new YarnException(e);
- }
+ try {
+ instantiateProxy(serviceAddr);
+ return;
+ } catch (IOException e) {
+ throw new YarnException(e);
}
- } else {
- LOG.warn("Cannot connect to Application with state " + appMaster.getState());
- throw new YarnException(
- "Cannot connect to Application with state " + appMaster.getState());
- }
- try {
- instantiateProxy(serviceAddr);
- } catch (IOException e) {
- throw new YarnException(e);
}
+ LOG.warn("Cannot connect to Application with state " + appMaster.getState());
+ throw new YarnException(
+ "Cannot connect to Application with state " + appMaster.getState());
}
private void instantiateProxy(final String serviceAddr) throws IOException {