You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/14 22:12:12 UTC
[11/37] hadoop git commit: MAPREDUCE-6251. Added a new config for
JobClient to retry JobStatus calls so that they don't fail on history-server
backed by DFSes with not so strong guarantees. Contributed by Craig Welch.
MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f24452d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f24452d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f24452d1
Branch: refs/heads/HDFS-7240
Commit: f24452d14e9ba48cdb82e5e6e5c10ce5b1407308
Parents: fe0df59
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Tue May 12 12:11:42 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue May 12 12:11:42 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 5 ++
.../org/apache/hadoop/mapred/JobClient.java | 51 +++++++++++----
.../apache/hadoop/mapreduce/MRJobConfig.java | 15 +++++
.../src/main/resources/mapred-default.xml | 17 +++++
.../apache/hadoop/mapred/JobClientUnitTest.java | 65 ++++++++++++++++++++
5 files changed, 142 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 15cdf90..fc98376 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -328,6 +328,7 @@ Release 2.8.0 - UNRELEASED
OPTIMIZATIONS
BUG FIXES
+
MAPREDUCE-6314. TestPipeApplication fails on trunk.
(Varun Vasudev via harsh)
@@ -450,6 +451,10 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
(zhihai xu via jlowe)
+ MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so
+ that they don't fail on history-server backed by DFSes with not so strong
+ guarantees. (Craig Welch via vinodkv)
+
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
index e91fbfe..cf123c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
@@ -154,6 +155,10 @@ public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
+ private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
+ private long retryInterval =
+ MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
+
static{
ConfigUtil.loadResources();
}
@@ -469,6 +474,14 @@ public class JobClient extends CLI {
setConf(conf);
cluster = new Cluster(conf);
clientUgi = UserGroupInformation.getCurrentUser();
+
+ maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
+
+ retryInterval =
+ conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
+ MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
+
}
/**
@@ -581,16 +594,8 @@ public class JobClient extends CLI {
}
});
}
- /**
- * Get an {@link RunningJob} object to track an ongoing job. Returns
- * null if the id does not correspond to any known job.
- *
- * @param jobid the jobid of the job.
- * @return the {@link RunningJob} handle to track the job, null if the
- * <code>jobid</code> doesn't correspond to any known job.
- * @throws IOException
- */
- public RunningJob getJob(final JobID jobid) throws IOException {
+
+ protected RunningJob getJobInner(final JobID jobid) throws IOException {
try {
Job job = getJobUsingCluster(jobid);
@@ -607,7 +612,31 @@ public class JobClient extends CLI {
return null;
}
- /**@deprecated Applications should rather use {@link #getJob(JobID)}.
+ /**
+ * Get an {@link RunningJob} object to track an ongoing job. Returns
+ * null if the id does not correspond to any known job.
+ *
+ * @param jobid the jobid of the job.
+ * @return the {@link RunningJob} handle to track the job, null if the
+ * <code>jobid</code> doesn't correspond to any known job.
+ * @throws IOException
+ */
+ public RunningJob getJob(final JobID jobid) throws IOException {
+ for (int i = 0;i <= maxRetry;i++) {
+ if (i > 0) {
+ try {
+ Thread.sleep(retryInterval);
+ } catch (Exception e) { }
+ }
+ RunningJob job = getJobInner(jobid);
+ if (job != null) {
+ return job;
+ }
+ }
+ return null;
+ }
+
+ /**@deprecated Applications should rather use {@link #getJob(JobID)}.
*/
@Deprecated
public RunningJob getJob(String jobid) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 49345cd..fb4064c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -469,6 +469,21 @@ public interface MRJobConfig {
MR_PREFIX + "client.max-retries";
public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+ /**
+ * How many times to retry jobclient calls (via getjob)
+ */
+ public static final String MR_CLIENT_JOB_MAX_RETRIES =
+ MR_PREFIX + "client.job.max-retries";
+ public static final int DEFAULT_MR_CLIENT_JOB_MAX_RETRIES = 0;
+
+ /**
+ * How long to wait between jobclient retries on failure
+ */
+ public static final String MR_CLIENT_JOB_RETRY_INTERVAL =
+ MR_PREFIX + "client.job.retry-interval";
+ public static final long DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL =
+ 2000;
+
/** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index a5e76b3..5daf66d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1391,6 +1391,23 @@
</property>
<property>
+ <name>yarn.app.mapreduce.client.job.max-retries</name>
+ <value>0</value>
+ <description>The number of retries the client will make for getJob and
+ dependent calls. The default is 0 as this is generally only needed for
+ non-HDFS DFS where additional, high level retries are required to avoid
+ spurious failures during the getJob call. 30 is a good value for
+ WASB</description>
+</property>
+
+<property>
+ <name>yarn.app.mapreduce.client.job.retry-interval</name>
+ <value>2000</value>
+ <description>The delay between getJob retries in ms for retries configured
+ with yarn.app.mapreduce.client.job.max-retries.</description>
+</property>
+
+<property>
<description>CLASSPATH for MR applications. A comma-separated list
of CLASSPATH entries. If mapreduce.application.framework is set then this
must specify the appropriate classpath for that archive, and the name of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
index 8dfac89..84b76bf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Assert;
@@ -52,6 +54,42 @@ public class JobClientUnitTest {
void setCluster(Cluster cluster) {
this.cluster = cluster;
}
+
+ }
+
+ public class TestJobClientGetJob extends TestJobClient {
+
+ int lastGetJobRetriesCounter = 0;
+ int getJobRetriesCounter = 0;
+ int getJobRetries = 0;
+ RunningJob runningJob;
+
+ TestJobClientGetJob(JobConf jobConf) throws IOException {
+ super(jobConf);
+ }
+
+ public int getLastGetJobRetriesCounter() {
+ return lastGetJobRetriesCounter;
+ }
+
+ public void setGetJobRetries(int getJobRetries) {
+ this.getJobRetries = getJobRetries;
+ }
+
+ public void setRunningJob(RunningJob runningJob) {
+ this.runningJob = runningJob;
+ }
+
+ protected RunningJob getJobInner(final JobID jobid) throws IOException {
+ if (getJobRetriesCounter >= getJobRetries) {
+ lastGetJobRetriesCounter = getJobRetriesCounter;
+ getJobRetriesCounter = 0;
+ return runningJob;
+ }
+ getJobRetriesCounter++;
+ return null;
+ }
+
}
@Test
@@ -124,6 +162,7 @@ public class JobClientUnitTest {
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobStatus.getJobID()).thenReturn(jobID);
+ when(mockJobStatus.getJobName()).thenReturn(jobID.toString());
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
when(mockJobStatus.getStartTime()).thenReturn(startTime);
when(mockJobStatus.getUsername()).thenReturn("mockuser");
@@ -181,4 +220,30 @@ public class JobClientUnitTest {
assertNull(client.getJob(id));
}
+ @Test
+ public void testGetJobRetry() throws Exception {
+
+ //To prevent the test from running for a very long time, lower the retry
+ JobConf conf = new JobConf();
+ conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3");
+
+ TestJobClientGetJob client = new TestJobClientGetJob(conf);
+ JobID id = new JobID("ajob",1);
+ RunningJob rj = mock(RunningJob.class);
+ client.setRunningJob(rj);
+
+ //no retry
+ assertNotNull(client.getJob(id));
+ assertEquals(client.getLastGetJobRetriesCounter(), 0);
+
+ //3 retry
+ client.setGetJobRetries(3);
+ assertNotNull(client.getJob(id));
+ assertEquals(client.getLastGetJobRetriesCounter(), 3);
+
+ //beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null
+ client.setGetJobRetries(5);
+ assertNull(client.getJob(id));
+ }
+
}