You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/09/20 00:35:13 UTC
svn commit: r1524856 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resou...
Author: vinodkv
Date: Thu Sep 19 22:35:12 2013
New Revision: 1524856
URL: http://svn.apache.org/r1524856
Log:
MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Sep 19 22:35:12 2013
@@ -193,6 +193,9 @@ Release 2.2.0 - UNRELEASED
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
via tgraves)
+ MAPREDUCE-5488. Changed MR client to keep trying to reach the application
+ when it sees that on attempt's AM is down. (Jian He via vinodkv)
+
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Sep 19 22:35:12 2013
@@ -496,6 +496,12 @@
<Field name="sslFileBufferSize" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+
+ <Match>
+ <Class name="org.apache.hadoop.mapred.ClientServiceDelegate" />
+ <Method name="invoke" />
+ <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
+ </Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Sep 19 22:35:12 2013
@@ -357,7 +357,7 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
/**
- * The number of client retries to the RM/HS/AM before throwing exception.
+ * The number of client retries to the RM/HS before throwing exception.
*/
public static final String MR_CLIENT_MAX_RETRIES =
MR_PREFIX + "client.max-retries";
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Sep 19 22:35:12 2013
@@ -982,7 +982,7 @@
<property>
<name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
- <value>1</value>
+ <value>3</value>
<description>The number of client retries to the AM - before reconnecting
to the RM to fetch Application Status.</description>
</property>
@@ -990,7 +990,7 @@
<property>
<name>yarn.app.mapreduce.client.max-retries</name>
<value>3</value>
- <description>The number of client retries to the RM/HS/AM before
+ <description>The number of client retries to the RM/HS before
throwing exception. This is a layer above the ipc.</description>
</property>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 19 22:35:12 2013
@@ -26,6 +26,7 @@ import java.security.PrivilegedException
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private static final String UNAVAILABLE = "N/A";
@@ -93,7 +96,8 @@ public class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
private String trackingUrl;
-
+ private AtomicBoolean usingAMProxy = new AtomicBoolean(false);
+ private int maxClientRetry;
private boolean amAclDisabledStatusLogged = false;
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
@@ -287,6 +291,7 @@ public class ClientServiceDelegate {
MRClientProtocol proxy =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
serviceAddr, conf);
+ usingAMProxy.set(true);
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
return proxy;
}
@@ -301,13 +306,15 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Method name mismatch", e);
}
- int maxRetries = this.conf.getInt(
+ maxClientRetry = this.conf.getInt(
MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
IOException lastException = null;
- while (maxRetries > 0) {
+ while (maxClientRetry > 0) {
+ MRClientProtocol MRClientProxy = null;
try {
- return methodOb.invoke(getProxy(), args);
+ MRClientProxy = getProxy();
+ return methodOb.invoke(MRClientProxy, args);
} catch (InvocationTargetException e) {
// Will not throw out YarnException anymore
LOG.debug("Failed to contact AM/History for job " + jobId +
@@ -315,22 +322,44 @@ public class ClientServiceDelegate {
// Force reconnection by setting the proxy to null.
realProxy = null;
// HS/AMS shut down
- maxRetries--;
+ // if it's AM shut down, do not decrement maxClientRetry as we wait for
+ // AM to be restarted.
+ if (!usingAMProxy.get()) {
+ maxClientRetry--;
+ }
+ usingAMProxy.set(false);
lastException = new IOException(e.getTargetException());
-
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
+ throw new YarnRuntimeException(ie);
+ }
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e);
// Force reconnection by setting the proxy to null.
realProxy = null;
// RM shutdown
- maxRetries--;
- lastException = new IOException(e.getMessage());
+ maxClientRetry--;
+ lastException = new IOException(e.getMessage());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
+ throw new YarnRuntimeException(ie);
+ }
}
}
throw lastException;
}
+ // Only for testing
+ @VisibleForTesting
+ public int getMaxClientRetry() {
+ return this.maxClientRetry;
+ }
+
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1524856&r1=1524855&r2=1524856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Sep 19 22:35:12 2013
@@ -141,6 +141,48 @@ public class TestClientServiceDelegate {
}
@Test
+ public void testRetriesOnAMConnectionFailures() throws Exception {
+ if (!isAMReachableFromClient) {
+ return;
+ }
+
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+ .thenReturn(getRunningApplicationReport("am1", 78));
+
+ // throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
+ // succeed in the 5th call.
+ final MRClientProtocol amProxy = mock(MRClientProtocol.class);
+ when(amProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenThrow(new RuntimeException("11"))
+ .thenThrow(new RuntimeException("22"))
+ .thenThrow(new RuntimeException("33"))
+ .thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
+ Configuration conf = new YarnConfiguration();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ ClientServiceDelegate clientServiceDelegate =
+ new ClientServiceDelegate(conf, rm, oldJobId, null) {
+ @Override
+ MRClientProtocol instantiateAMProxy(
+ final InetSocketAddress serviceAddr) throws IOException {
+ super.instantiateAMProxy(serviceAddr);
+ return amProxy;
+ }
+ };
+
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+
+ Assert.assertNotNull(jobStatus);
+ // assert maxClientRetry is not decremented.
+ Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
+ .getMaxClientRetry());
+ verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
+ }
+
+ @Test
public void testHistoryServerNotConfigured() throws Exception {
//RM doesn't have app report and job History Server is not configured
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(