You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/03/18 20:31:44 UTC

hadoop git commit: MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li (cherry picked from commit 30da99cbaf36aeef38a858251ce8ffa5eb657b38)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 a08b1b570 -> 24d8c6f35


MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM. Contributed by Chang Li
(cherry picked from commit 30da99cbaf36aeef38a858251ce8ffa5eb657b38)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24d8c6f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24d8c6f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24d8c6f3

Branch: refs/heads/branch-2
Commit: 24d8c6f35506bac3ad1e8129f59df6b871b52a7d
Parents: a08b1b5
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Mar 18 19:29:56 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Mar 18 19:31:32 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../v2/app/rm/RMContainerAllocator.java         |  2 +-
 .../v2/app/rm/TestRMContainerAllocator.java     | 61 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5cd974c..8906809 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -210,6 +210,9 @@ Release 2.7.0 - UNRELEASED
 
     MAPREDUCE-4742. Fix typo in nnbench#displayUsage. (Liang Xie via ozawa)
 
+    MAPREDUCE-6277. Job can post multiple history files if attempt loses
+    connection to the RM (Chang Li via jlowe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 8d35d79..99d26a2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -698,7 +698,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
         LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                         JobEventType.INTERNAL_ERROR));
+                                         JobEventType.JOB_AM_REBOOT));
         throw new YarnRuntimeException("Could not contact RM after " +
                                 retryInterval + " milliseconds.");
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d8c6f3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index f290037..c58cdbd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -1608,6 +1610,7 @@ public class TestRMContainerAllocator {
       = new ArrayList<TaskAttemptKillEvent>();
     static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
     = new ArrayList<JobUpdatedNodesEvent>();
+    static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
     private AllocateResponse allocateResponse;
@@ -1630,6 +1633,8 @@ public class TestRMContainerAllocator {
             taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
           } else if (event instanceof JobUpdatedNodesEvent) {
             jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
+          } else if (event instanceof JobEvent) {
+            jobEvents.add((JobEvent)event);
           }
         }
       });
@@ -1782,6 +1787,18 @@ public class TestRMContainerAllocator {
     }
   }
 
+  private static class MyContainerAllocator2 extends MyContainerAllocator {
+    public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
+      ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+    @Override
+    protected AllocateResponse makeRemoteRequest() throws IOException,
+      YarnException {
+      throw new YarnRuntimeException("for testing");
+    }
+  }
+
   @Test
   public void testReduceScheduling() throws Exception {
     int totalMaps = 10;
@@ -2307,6 +2324,50 @@ public class TestRMContainerAllocator {
 
   }
 
+  @Test
+  public void testRMUnavailable()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(
+      MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
+    MyResourceManager rm1 = new MyResourceManager(conf);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+        0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator2 allocator =
+        new MyContainerAllocator2(rm1, conf, appAttemptId, mockJob);
+    allocator.jobEvents.clear();
+    try {
+      allocator.schedule();
+      Assert.fail("Should Have Exception");
+    } catch (YarnRuntimeException e) {
+      Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
+    }
+    dispatcher.await();
+    Assert.assertEquals("Should Have 1 Job Event", 1,
+        allocator.jobEvents.size());
+    JobEvent event = allocator.jobEvents.get(0); 
+    Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
+  }
+
   @Test(timeout=60000)
   public void testAMRMTokenUpdate() throws Exception {
     LOG.info("Running testAMRMTokenUpdate");