You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/04/19 16:55:03 UTC

svn commit: r1327978 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/res...

Author: tgraves
Date: Thu Apr 19 14:55:03 2012
New Revision: 1327978

URL: http://svn.apache.org/viewvc?rev=1327978&view=rev
Log:
merge -r 1327976:1327977 from branch-2. FIXES: MAPREDUCE-4074

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Apr 19 14:55:03 2012
@@ -117,6 +117,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156 
     (tgraves)
 
+    MAPREDUCE-4074. Client continuously retries to RM When RM goes down 
+    before launching Application Master (xieguiming via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Apr 19 14:55:03 2012
@@ -325,6 +325,13 @@ public interface MRJobConfig {
     MR_PREFIX + "client-am.ipc.max-retries";
   public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
   
+  /**
+   * The number of client retries to the RM/HS/AM before throwing exception.
+   */
+  public static final String MR_CLIENT_MAX_RETRIES = 
+    MR_PREFIX + "client.max-retries";
+  public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+  
   /** The staging directory for map reduce.*/
   public static final String MR_AM_STAGING_DIR = 
     MR_AM_PREFIX+"staging-dir";

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Apr 19 14:55:03 2012
@@ -1250,6 +1250,13 @@
     to the RM to fetch Application Status.</description>
 </property>
 
+<property>
+  <name>yarn.app.mapreduce.client.max-retries</name>
+  <value>3</value>
+  <description>The number of client retries to the RM/HS/AM before
+    throwing exception. This is a layer above the ipc.</description>
+</property>
+
 <!-- jobhistory properties -->
 
 <property>

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Apr 19 14:55:03 2012
@@ -282,7 +282,7 @@ public class ClientServiceDelegate {
   }
 
   private synchronized Object invoke(String method, Class argClass,
-      Object args) throws YarnRemoteException {
+      Object args) throws IOException {
     Method methodOb = null;
     try {
       methodOb = MRClientProtocol.class.getMethod(method, argClass);
@@ -291,7 +291,11 @@ public class ClientServiceDelegate {
     } catch (NoSuchMethodException e) {
       throw new YarnException("Method name mismatch", e);
     }
-    while (true) {
+    int maxRetries = this.conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+    IOException lastException = null;
+    while (maxRetries > 0) {
       try {
         return methodOb.invoke(getProxy(), args);
       } catch (YarnRemoteException yre) {
@@ -308,13 +312,21 @@ public class ClientServiceDelegate {
             " retrying..", e.getTargetException());
         // Force reconnection by setting the proxy to null.
         realProxy = null;
+        // HS/AMS shut down
+        maxRetries--;
+        lastException = new IOException(e.getMessage());
+        
       } catch (Exception e) {
         LOG.debug("Failed to contact AM/History for job " + jobId
             + "  Will retry..", e);
         // Force reconnection by setting the proxy to null.
         realProxy = null;
+        // RM shutdown
+        maxRetries--;
+        lastException = new IOException(e.getMessage());     
       }
     }
+    throw lastException;
   }
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@@ -364,7 +376,7 @@ public class ClientServiceDelegate {
     return result;
   }
   
-  public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
       TypeConverter.toYarn(oldJobID);
     GetJobReportRequest request =
@@ -386,7 +398,7 @@ public class ClientServiceDelegate {
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
-       throws YarnRemoteException, YarnRemoteException {
+       throws IOException{
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
       TypeConverter.toYarn(oldJobID);
     GetTaskReportsRequest request =
@@ -403,7 +415,7 @@ public class ClientServiceDelegate {
   }
 
   public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
-       throws YarnRemoteException {
+       throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
       = TypeConverter.toYarn(taskAttemptID);
     if (fail) {
@@ -419,7 +431,7 @@ public class ClientServiceDelegate {
   }
 
   public boolean killJob(JobID oldJobID)
-       throws YarnRemoteException {
+       throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
     = TypeConverter.toYarn(oldJobID);
     KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Apr 19 14:55:03 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
 
     MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
     when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
-        .thenThrow(new RuntimeException("3"))
+        new RuntimeException("1")).thenThrow(new RuntimeException("2"))       
         .thenReturn(getJobReportResponse());
 
     ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
 
     JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
-    verify(historyServerProxy, times(4)).getJobReport(
+    verify(historyServerProxy, times(3)).getJobReport(
         any(GetJobReportRequest.class));
   }
 
@@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
         any(String.class));
   }
   
+  @Test
+  public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
+    Configuration conf = new YarnConfiguration();
+    testRMDownForJobStatusBeforeGetAMReport(conf,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+  }
+
+  @Test
+  public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
+      throws IOException {
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
+    testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
+  }
+  
+  @Test
+  public void testRMDownRestoreForJobStatusBeforeGetAMReport()
+      throws IOException {
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
+
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+        !isAMReachableFromClient);
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(getJobReportResponse());
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced1"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rmDelegate, oldJobId, historyServerProxy);
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
+    Assert.assertNotNull(jobStatus);
+  }
+
+  private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
+      int noOfRetries) throws YarnRemoteException {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+        !isAMReachableFromClient);
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced1"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced2"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced3")));
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rmDelegate, oldJobId, historyServerProxy);
+    try {
+      clientServiceDelegate.getJobStatus(oldJobId);
+      Assert.fail("It should throw exception after retries");
+    } catch (IOException e) {
+      System.out.println("fail to get job status,and e=" + e.toString());
+    }
+    verify(rmDelegate, times(noOfRetries)).getApplicationReport(
+        any(ApplicationId.class));
+  }  
+
   private GetJobReportRequest getJobReportRequest() {
     GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
     request.setJobId(jobId);