You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/14 22:12:12 UTC

[11/37] hadoop git commit: MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch.

MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f24452d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f24452d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f24452d1

Branch: refs/heads/HDFS-7240
Commit: f24452d14e9ba48cdb82e5e6e5c10ce5b1407308
Parents: fe0df59
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Tue May 12 12:11:42 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue May 12 12:11:42 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  5 ++
 .../org/apache/hadoop/mapred/JobClient.java     | 51 +++++++++++----
 .../apache/hadoop/mapreduce/MRJobConfig.java    | 15 +++++
 .../src/main/resources/mapred-default.xml       | 17 +++++
 .../apache/hadoop/mapred/JobClientUnitTest.java | 65 ++++++++++++++++++++
 5 files changed, 142 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 15cdf90..fc98376 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -328,6 +328,7 @@ Release 2.8.0 - UNRELEASED
   OPTIMIZATIONS
 
   BUG FIXES
+
     MAPREDUCE-6314. TestPipeApplication fails on trunk.
     (Varun Vasudev via harsh)
 
@@ -450,6 +451,10 @@ Release 2.7.1 - UNRELEASED
     MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
     (zhihai xu via jlowe)
 
+    MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so
+    that they don't fail on history-server backed by DFSes with not so strong
+    guarantees. (Craig Welch via vinodkv)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
index e91fbfe..cf123c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -154,6 +155,10 @@ public class JobClient extends CLI {
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
   
+  private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
+  private long retryInterval =
+      MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
+
   static{
     ConfigUtil.loadResources();
   }
@@ -469,6 +474,14 @@ public class JobClient extends CLI {
     setConf(conf);
     cluster = new Cluster(conf);
     clientUgi = UserGroupInformation.getCurrentUser();
+
+    maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
+      MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
+
+    retryInterval =
+      conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
+        MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
+
   }
 
   /**
@@ -581,16 +594,8 @@ public class JobClient extends CLI {
       }
     });
   }
-  /**
-   * Get an {@link RunningJob} object to track an ongoing job.  Returns
-   * null if the id does not correspond to any known job.
-   * 
-   * @param jobid the jobid of the job.
-   * @return the {@link RunningJob} handle to track the job, null if the 
-   *         <code>jobid</code> doesn't correspond to any known job.
-   * @throws IOException
-   */
-  public RunningJob getJob(final JobID jobid) throws IOException {
+
+  protected RunningJob getJobInner(final JobID jobid) throws IOException {
     try {
       
       Job job = getJobUsingCluster(jobid);
@@ -607,7 +612,31 @@ public class JobClient extends CLI {
     return null;
   }
 
-  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
+  /**
+   * Get an {@link RunningJob} object to track an ongoing job.  Returns
+   * null if the id does not correspond to any known job.
+   *
+   * @param jobid the jobid of the job.
+   * @return the {@link RunningJob} handle to track the job, null if the
+   *         <code>jobid</code> doesn't correspond to any known job.
+   * @throws IOException
+   */
+  public RunningJob getJob(final JobID jobid) throws IOException {
+     for (int i = 0;i <= maxRetry;i++) {
+       if (i > 0) {
+         try {
+           Thread.sleep(retryInterval);
+         } catch (Exception e) { }
+       }
+       RunningJob job = getJobInner(jobid);
+       if (job != null) {
+         return job;
+       }
+     }
+     return null;
+  }
+
+  /**@deprecated Applications should rather use {@link #getJob(JobID)}.
    */
   @Deprecated
   public RunningJob getJob(String jobid) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 49345cd..fb4064c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -469,6 +469,21 @@ public interface MRJobConfig {
     MR_PREFIX + "client.max-retries";
   public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
   
+  /**
+   * How many times to retry jobclient calls (via getjob)
+   */
+  public static final String MR_CLIENT_JOB_MAX_RETRIES =
+      MR_PREFIX + "client.job.max-retries";
+  public static final int DEFAULT_MR_CLIENT_JOB_MAX_RETRIES = 0;
+
+  /**
+   * How long to wait between jobclient retries on failure
+   */
+  public static final String MR_CLIENT_JOB_RETRY_INTERVAL =
+      MR_PREFIX + "client.job.retry-interval";
+  public static final long DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL =
+      2000;
+
   /** The staging directory for map reduce.*/
   public static final String MR_AM_STAGING_DIR = 
     MR_AM_PREFIX+"staging-dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index a5e76b3..5daf66d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1391,6 +1391,23 @@
 </property>
 
 <property>
+  <name>yarn.app.mapreduce.client.job.max-retries</name>
+  <value>0</value>
+  <description>The number of retries the client will make for getJob and
+  dependent calls.  The default is 0 as this is generally only needed for
+  non-HDFS DFS where additional, high level retries are required to avoid
+  spurious failures during the getJob call.  30 is a good value for
+  WASB</description>
+</property>
+
+<property>
+  <name>yarn.app.mapreduce.client.job.retry-interval</name>
+  <value>2000</value>
+  <description>The delay between getJob retries in ms for retries configured
+  with yarn.app.mapreduce.client.job.max-retries.</description>
+</property>
+
+<property>
   <description>CLASSPATH for MR applications. A comma-separated list
   of CLASSPATH entries. If mapreduce.application.framework is set then this
   must specify the appropriate classpath for that archive, and the name of

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24452d1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
index 8dfac89..84b76bf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.junit.Assert;
@@ -52,6 +54,42 @@ public class JobClientUnitTest {
     void setCluster(Cluster cluster) {
       this.cluster = cluster;
     }
+
+  }
+
+  public class TestJobClientGetJob extends TestJobClient {
+
+    int lastGetJobRetriesCounter = 0;
+    int getJobRetriesCounter = 0;
+    int getJobRetries = 0;
+    RunningJob runningJob;
+
+    TestJobClientGetJob(JobConf jobConf) throws IOException {
+      super(jobConf);
+    }
+
+    public int getLastGetJobRetriesCounter() {
+      return lastGetJobRetriesCounter;
+    }
+
+    public void setGetJobRetries(int getJobRetries) {
+      this.getJobRetries = getJobRetries;
+    }
+
+    public void setRunningJob(RunningJob runningJob) {
+      this.runningJob = runningJob;
+    }
+
+    protected RunningJob getJobInner(final JobID jobid) throws IOException {
+      if (getJobRetriesCounter >= getJobRetries) {
+        lastGetJobRetriesCounter = getJobRetriesCounter;
+        getJobRetriesCounter = 0;
+        return runningJob;
+      }
+      getJobRetriesCounter++;
+      return null;
+    }
+
   }
 
   @Test
@@ -124,6 +162,7 @@ public class JobClientUnitTest {
 
     JobStatus mockJobStatus = mock(JobStatus.class);
     when(mockJobStatus.getJobID()).thenReturn(jobID);
+    when(mockJobStatus.getJobName()).thenReturn(jobID.toString());
     when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
     when(mockJobStatus.getStartTime()).thenReturn(startTime);
     when(mockJobStatus.getUsername()).thenReturn("mockuser");
@@ -181,4 +220,30 @@ public class JobClientUnitTest {
     assertNull(client.getJob(id));
   }
 
+  @Test
+  public void testGetJobRetry() throws Exception {
+
+    //To prevent the test from running for a very long time, lower the retry
+    JobConf conf = new JobConf();
+    conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3");
+
+    TestJobClientGetJob client = new TestJobClientGetJob(conf);
+    JobID id = new JobID("ajob",1);
+    RunningJob rj = mock(RunningJob.class);
+    client.setRunningJob(rj);
+
+    //no retry
+    assertNotNull(client.getJob(id));
+    assertEquals(client.getLastGetJobRetriesCounter(), 0);
+
+    //3 retry
+    client.setGetJobRetries(3);
+    assertNotNull(client.getJob(id));
+    assertEquals(client.getLastGetJobRetriesCounter(), 3);
+
+    //beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null
+    client.setGetJobRetries(5);
+    assertNull(client.getJob(id));
+  }
+
 }