You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/12/13 08:27:26 UTC

[07/37] hadoop git commit: MAPREDUCE-6566. Add retry support to mapreduce CLI tool. Contributed by Varun Vasudev

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc470840/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
index 59f5ada..db87d9d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.Arrays;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskReport;
@@ -268,7 +270,7 @@ public class CLI extends Configured implements Tool {
         System.out.println("Created job " + job.getJobID());
         exitCode = 0;
       } else if (getStatus) {
-        Job job = cluster.getJob(JobID.forName(jobid));
+        Job job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -283,7 +285,7 @@ public class CLI extends Configured implements Tool {
           exitCode = 0;
         }
       } else if (getCounter) {
-        Job job = cluster.getJob(JobID.forName(jobid));
+        Job job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -299,7 +301,7 @@ public class CLI extends Configured implements Tool {
           }
         }
       } else if (killJob) {
-        Job job = cluster.getJob(JobID.forName(jobid));
+        Job job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -323,7 +325,7 @@ public class CLI extends Configured implements Tool {
           }
         }
       } else if (setJobPriority) {
-        Job job = cluster.getJob(JobID.forName(jobid));
+        Job job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -339,7 +341,7 @@ public class CLI extends Configured implements Tool {
         viewHistory(historyFile, viewAllHistory);
         exitCode = 0;
       } else if (listEvents) {
-        listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
+        listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
         exitCode = 0;
       } else if (listJobs) {
         listJobs(cluster);
@@ -354,11 +356,11 @@ public class CLI extends Configured implements Tool {
         listBlacklistedTrackers(cluster);
         exitCode = 0;
       } else if (displayTasks) {
-        displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
+        displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
         exitCode = 0;
       } else if(killTask) {
         TaskAttemptID taskID = TaskAttemptID.forName(taskid);
-        Job job = cluster.getJob(taskID.getJobID());
+        Job job = getJob(taskID.getJobID());
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else if (job.killTask(taskID, false)) {
@@ -370,7 +372,7 @@ public class CLI extends Configured implements Tool {
         }
       } else if(failTask) {
         TaskAttemptID taskID = TaskAttemptID.forName(taskid);
-        Job job = cluster.getJob(taskID.getJobID());
+        Job job = getJob(taskID.getJobID());
         if (job == null) {
             System.out.println("Could not find job " + jobid);
         } else if(job.killTask(taskID, true)) {
@@ -531,6 +533,29 @@ public class CLI extends Configured implements Tool {
   protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
     return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
   }
+
+  @VisibleForTesting
+  Job getJob(JobID jobid) throws IOException, InterruptedException {
+
+    int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
+    long retryInterval = getConf()
+        .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
+            MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
+    Job job = cluster.getJob(jobid);
+
+    for (int i = 0; i < maxRetry; ++i) {
+      if (job != null) {
+        return job;
+      }
+      LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
+          + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
+          + " seconds and retrying.");
+      Thread.sleep(retryInterval);
+      job = cluster.getJob(jobid);
+    }
+    return job;
+  }
   
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc470840/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java
index fdc916e..73f57d5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java
@@ -20,14 +20,19 @@ package org.apache.hadoop.mapreduce.tools;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
 import org.junit.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -44,7 +49,7 @@ public class TestCLI {
     JobID jobId = JobID.forName(jobIdStr);
     Cluster mockCluster = mock(Cluster.class);
     Job job = mock(Job.class);
-    CLI cli = spy(new CLI());
+    CLI cli = spy(new CLI(new Configuration()));
 
     doReturn(mockCluster).when(cli).createCluster();
     when(job.getTaskReports(TaskType.MAP)).thenReturn(
@@ -112,7 +117,7 @@ public class TestCLI {
   @Test
   public void testJobKIll() throws Exception {
     Cluster mockCluster = mock(Cluster.class);
-    CLI cli = spy(new CLI());
+    CLI cli = spy(new CLI(new Configuration()));
     doReturn(mockCluster).when(cli).createCluster();
     String jobId1 = "job_1234654654_001";
     String jobId2 = "job_1234654654_002";
@@ -149,4 +154,26 @@ public class TestCLI {
     when(mockJob.getStatus()).thenReturn(status);
     return mockJob;
   }
+
+  @Test
+  public void testGetJob() throws Exception {
+    Configuration conf = new Configuration();
+    long sleepTime = 100;
+    conf.setLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, sleepTime);
+    Cluster mockCluster = mock(Cluster.class);
+    JobID jobId1 = JobID.forName("job_1234654654_001");
+    when(mockCluster.getJob(jobId1)).thenReturn(null);
+
+    for (int i = 0; i < 2; ++i) {
+      conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, i);
+      CLI cli = spy(new CLI(conf));
+      cli.cluster = mockCluster;
+      doReturn(mockCluster).when(cli).createCluster();
+      long start = Time.monotonicNow();
+      cli.getJob(jobId1);
+      long end = Time.monotonicNow();
+      Assert.assertTrue(end - start > (i * sleepTime));
+      Assert.assertTrue(end - start < ((i + 1) * sleepTime));
+    }
+  }
 }