You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/09 09:48:06 UTC

svn commit: r1100907 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

Author: sharad
Date: Mon May  9 07:48:06 2011
New Revision: 1100907

URL: http://svn.apache.org/viewvc?rev=1100907&view=rev
Log:
Client reconnect to restarted AM.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1100907&r1=1100906&r2=1100907&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May  9 07:48:06 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    Client reconnect to restarted AM. (sharad)
 
     Replacing FileContext usage with FileSystem to work around security authentication
     issues with FileContext against a secure DFS. (vinodkv)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1100907&r1=1100906&r2=1100907&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Mon May  9 07:48:06 2011
@@ -94,6 +94,44 @@ public class ClientServiceDelegate {
 
   private void refreshProxy() throws YarnRemoteException {
     ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
+    while (!ApplicationState.COMPLETED.equals(appMaster.getState()) ||
+           !ApplicationState.FAILED.equals(appMaster.getState()) || 
+           !ApplicationState.KILLED.equals(appMaster.getState())) {
+      try {
+        if (appMaster.getHost() == null || "".equals(appMaster.getHost())) {
+          LOG.info("AM not assigned to Job. Waiting to get the AM ...");
+          Thread.sleep(2000);
+          appMaster = rm.getApplicationMaster(currentAppId);
+          continue;
+        }
+        serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
+        serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
+        if (UserGroupInformation.isSecurityEnabled()) {
+          String clientTokenEncoded = appMaster.getClientToken();
+          Token<ApplicationTokenIdentifier> clientToken =
+            new Token<ApplicationTokenIdentifier>();
+          clientToken.decodeFromUrlString(clientTokenEncoded);
+            clientToken.setService(new Text(appMaster.getHost() + ":"
+                + appMaster.getRpcPort()));
+            UserGroupInformation.getCurrentUser().addToken(clientToken);
+        }
+        LOG.info("Connecting to " + serviceAddr);
+        instantiateProxy(serviceAddr);
+        return;
+      } catch (Exception e) {
+        //possibly
+        //possibly the AM has crashed
+        //there may be some time before AM is restarted
+        //keep retrying by getting the address from RM
+        LOG.info("Could not connect to " + serviceAddr + 
+            ". Waiting for getting the latest AM address...");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e1) {
+        }
+        appMaster = rm.getApplicationMaster(currentAppId);
+      }
+    }
     if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
       serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
           YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
@@ -101,32 +139,16 @@ public class ClientServiceDelegate {
             "Redirecting to job history server " + serviceAddr);
       //TODO:
       serviceHttpAddr = "";
-    } else if (ApplicationState.RUNNING.equals(appMaster.getState())){
-      serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
-      serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
-      if (UserGroupInformation.isSecurityEnabled()) {
-        String clientTokenEncoded = appMaster.getClientToken();
-        Token<ApplicationTokenIdentifier> clientToken =
-            new Token<ApplicationTokenIdentifier>();
-        try {
-          clientToken.decodeFromUrlString(clientTokenEncoded);
-          clientToken.setService(new Text(appMaster.getHost() + ":"
-              + appMaster.getRpcPort()));
-          UserGroupInformation.getCurrentUser().addToken(clientToken);
-        } catch (IOException e) {
-          throw new YarnException(e);
-        }
+      try {
+        instantiateProxy(serviceAddr);
+        return;
+      } catch (IOException e) {
+        throw new YarnException(e);
       }
-    } else {
-      LOG.warn("Cannot connect to Application with state " + appMaster.getState());
-      throw new YarnException(
-          "Cannot connect to Application with state " + appMaster.getState());
-    }
-    try {
-      instantiateProxy(serviceAddr);
-    } catch (IOException e) {
-      throw new YarnException(e);
     }
+    LOG.warn("Cannot connect to Application with state " + appMaster.getState());
+      throw new YarnException(
+        "Cannot connect to Application with state " + appMaster.getState());
   }
 
   private void instantiateProxy(final String serviceAddr) throws IOException {