You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/04/19 16:55:03 UTC
svn commit: r1327978 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/res...
Author: tgraves
Date: Thu Apr 19 14:55:03 2012
New Revision: 1327978
URL: http://svn.apache.org/viewvc?rev=1327978&view=rev
Log:
merge -r 1327976:1327977 from branch-2. FIXES: MAPREDUCE-4074
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Apr 19 14:55:03 2012
@@ -117,6 +117,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156
(tgraves)
+ MAPREDUCE-4074. Client continuously retries to RM When RM goes down
+ before launching Application Master (xieguiming via tgraves)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Apr 19 14:55:03 2012
@@ -325,6 +325,13 @@ public interface MRJobConfig {
MR_PREFIX + "client-am.ipc.max-retries";
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
+ /**
+ * The number of client retries to the RM/HS/AM before throwing exception.
+ */
+ public static final String MR_CLIENT_MAX_RETRIES =
+ MR_PREFIX + "client.max-retries";
+ public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+
/** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir";
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Apr 19 14:55:03 2012
@@ -1250,6 +1250,13 @@
to the RM to fetch Application Status.</description>
</property>
+<property>
+ <name>yarn.app.mapreduce.client.max-retries</name>
+ <value>3</value>
+ <description>The number of client retries to the RM/HS/AM before
+ throwing exception. This is a layer above the ipc.</description>
+</property>
+
<!-- jobhistory properties -->
<property>
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Apr 19 14:55:03 2012
@@ -282,7 +282,7 @@ public class ClientServiceDelegate {
}
private synchronized Object invoke(String method, Class argClass,
- Object args) throws YarnRemoteException {
+ Object args) throws IOException {
Method methodOb = null;
try {
methodOb = MRClientProtocol.class.getMethod(method, argClass);
@@ -291,7 +291,11 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) {
throw new YarnException("Method name mismatch", e);
}
- while (true) {
+ int maxRetries = this.conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ IOException lastException = null;
+ while (maxRetries > 0) {
try {
return methodOb.invoke(getProxy(), args);
} catch (YarnRemoteException yre) {
@@ -308,13 +312,21 @@ public class ClientServiceDelegate {
" retrying..", e.getTargetException());
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // HS/AMS shut down
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e);
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // RM shutdown
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
}
}
+ throw lastException;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@@ -364,7 +376,7 @@ public class ClientServiceDelegate {
return result;
}
- public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
@@ -386,7 +398,7 @@ public class ClientServiceDelegate {
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
- throws YarnRemoteException, YarnRemoteException {
+ throws IOException{
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetTaskReportsRequest request =
@@ -403,7 +415,7 @@ public class ClientServiceDelegate {
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
if (fail) {
@@ -419,7 +431,7 @@ public class ClientServiceDelegate {
}
public boolean killJob(JobID oldJobID)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1327978&r1=1327977&r2=1327978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Apr 19 14:55:03 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
- new RuntimeException("1")).thenThrow(new RuntimeException("2"))
- .thenThrow(new RuntimeException("3"))
+ new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
- verify(historyServerProxy, times(4)).getJobReport(
+ verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
any(String.class));
}
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
+ Configuration conf = new YarnConfiguration();
+ testRMDownForJobStatusBeforeGetAMReport(conf,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ }
+
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
+ testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
+ }
+
+ @Test
+ public void testRMDownRestoreForJobStatusBeforeGetAMReport()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
+
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenReturn(getJobReportResponse());
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
+ Assert.assertNotNull(jobStatus);
+ }
+
+ private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
+ int noOfRetries) throws YarnRemoteException {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced3")));
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ try {
+ clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.fail("It should throw exception after retries");
+ } catch (IOException e) {
+ System.out.println("fail to get job status,and e=" + e.toString());
+ }
+ verify(rmDelegate, times(noOfRetries)).getApplicationReport(
+ any(ApplicationId.class));
+ }
+
private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId);