You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/09/20 00:35:13 UTC

svn commit: r1524856 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resou...

Author: vinodkv
Date: Thu Sep 19 22:35:12 2013
New Revision: 1524856

URL: http://svn.apache.org/r1524856
Log:
MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Sep 19 22:35:12 2013
@@ -193,6 +193,9 @@ Release 2.2.0 - UNRELEASED
     MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
     via tgraves)
 
+    MAPREDUCE-5488. Changed MR client to keep trying to reach the application
+    when it sees that on attempt's AM is down. (Jian He via vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Sep 19 22:35:12 2013
@@ -496,6 +496,12 @@
       <Field name="sslFileBufferSize" />
      <Bug pattern="IS2_INCONSISTENT_SYNC" />
    </Match> 
+
+   <Match>
+     <Class name="org.apache.hadoop.mapred.ClientServiceDelegate" />
+      <Method name="invoke" />
+     <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
+   </Match>
    
   <Match>
     <Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Sep 19 22:35:12 2013
@@ -357,7 +357,7 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
   
   /**
-   * The number of client retries to the RM/HS/AM before throwing exception.
+   * The number of client retries to the RM/HS before throwing exception.
    */
   public static final String MR_CLIENT_MAX_RETRIES = 
     MR_PREFIX + "client.max-retries";

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Sep 19 22:35:12 2013
@@ -982,7 +982,7 @@
 
 <property>
   <name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
-  <value>1</value>
+  <value>3</value>
   <description>The number of client retries to the AM - before reconnecting
     to the RM to fetch Application Status.</description>
 </property>
@@ -990,7 +990,7 @@
 <property>
   <name>yarn.app.mapreduce.client.max-retries</name>
   <value>3</value>
-  <description>The number of client retries to the RM/HS/AM before
+  <description>The number of client retries to the RM/HS before
     throwing exception. This is a layer above the ipc.</description>
 </property>
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 19 22:35:12 2013
@@ -26,6 +26,7 @@ import java.security.PrivilegedException
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
   private static final String UNAVAILABLE = "N/A";
@@ -93,7 +96,8 @@ public class ClientServiceDelegate {
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private static String UNKNOWN_USER = "Unknown User";
   private String trackingUrl;
-
+  private AtomicBoolean usingAMProxy = new AtomicBoolean(false);
+  private int maxClientRetry;
   private boolean amAclDisabledStatusLogged = false;
 
   public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
@@ -287,6 +291,7 @@ public class ClientServiceDelegate {
     MRClientProtocol proxy = 
          (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
             serviceAddr, conf);
+    usingAMProxy.set(true);
     LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
     return proxy;
   }
@@ -301,13 +306,15 @@ public class ClientServiceDelegate {
     } catch (NoSuchMethodException e) {
       throw new YarnRuntimeException("Method name mismatch", e);
     }
-    int maxRetries = this.conf.getInt(
+    maxClientRetry = this.conf.getInt(
         MRJobConfig.MR_CLIENT_MAX_RETRIES,
         MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
     IOException lastException = null;
-    while (maxRetries > 0) {
+    while (maxClientRetry > 0) {
+      MRClientProtocol MRClientProxy = null;
       try {
-        return methodOb.invoke(getProxy(), args);
+        MRClientProxy = getProxy();
+        return methodOb.invoke(MRClientProxy, args);
       } catch (InvocationTargetException e) {
         // Will not throw out YarnException anymore
         LOG.debug("Failed to contact AM/History for job " + jobId + 
@@ -315,22 +322,44 @@ public class ClientServiceDelegate {
         // Force reconnection by setting the proxy to null.
         realProxy = null;
         // HS/AMS shut down
-        maxRetries--;
+        // if it's AM shut down, do not decrement maxClientRetry as we wait for
+        // AM to be restarted.
+        if (!usingAMProxy.get()) {
+          maxClientRetry--;
+        }
+        usingAMProxy.set(false);
         lastException = new IOException(e.getTargetException());
-        
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
+          throw new YarnRuntimeException(ie);
+        }
       } catch (Exception e) {
         LOG.debug("Failed to contact AM/History for job " + jobId
             + "  Will retry..", e);
         // Force reconnection by setting the proxy to null.
         realProxy = null;
         // RM shutdown
-        maxRetries--;
-        lastException = new IOException(e.getMessage());     
+        maxClientRetry--;
+        lastException = new IOException(e.getMessage());
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
+          throw new YarnRuntimeException(ie);
+        }
       }
     }
     throw lastException;
   }
 
+  // Only for testing
+  @VisibleForTesting
+  public int getMaxClientRetry() {
+    return this.maxClientRetry;
+  }
+
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
   InterruptedException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Sep 19 22:35:12 2013
@@ -141,6 +141,48 @@ public class TestClientServiceDelegate {
   }
 
   @Test
+  public void testRetriesOnAMConnectionFailures() throws Exception {
+    if (!isAMReachableFromClient) {
+      return;
+    }
+
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+      .thenReturn(getRunningApplicationReport("am1", 78));
+
+    // throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
+    // succeed in the 5th call.
+    final MRClientProtocol amProxy = mock(MRClientProtocol.class);
+    when(amProxy.getJobReport(any(GetJobReportRequest.class)))
+      .thenThrow(new RuntimeException("11"))
+      .thenThrow(new RuntimeException("22"))
+      .thenThrow(new RuntimeException("33"))
+      .thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
+    Configuration conf = new YarnConfiguration();
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+      !isAMReachableFromClient);
+    ClientServiceDelegate clientServiceDelegate =
+        new ClientServiceDelegate(conf, rm, oldJobId, null) {
+          @Override
+          MRClientProtocol instantiateAMProxy(
+              final InetSocketAddress serviceAddr) throws IOException {
+            super.instantiateAMProxy(serviceAddr);
+            return amProxy;
+          }
+        };
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+
+    Assert.assertNotNull(jobStatus);
+    // assert maxClientRetry is not decremented.
+    Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
+      MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
+      .getMaxClientRetry());
+    verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
+  }
+
+  @Test
   public void testHistoryServerNotConfigured() throws Exception {
     //RM doesn't have app report and job History Server is not configured
     ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(