You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/12/21 23:34:42 UTC

svn commit: r1425168 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Author: jlowe
Date: Fri Dec 21 22:34:41 2012
New Revision: 1425168

URL: http://svn.apache.org/viewvc?rev=1425168&view=rev
Log:
svn merge -c 1425167 FIXES: MAPREDUCE-4833. Task can get stuck in FAIL_CONTAINER_CLEANUP. Contributed by Robert Parker

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1425168&r1=1425167&r2=1425168&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Dec 21 22:34:41 2012
@@ -475,6 +475,9 @@ Release 0.23.6 - UNRELEASED
 
     MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe)
 
+    MAPREDUCE-4833. Task can get stuck in FAIL_CONTAINER_CLEANUP (Robert
+    Parker via jlowe)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1425168&r1=1425167&r2=1425168&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Dec 21 22:34:41 2012
@@ -191,12 +191,9 @@ public class ContainerLauncherImpl exten
     @SuppressWarnings("unchecked")
     public synchronized void kill() {
 
-      if(isCompletelyDone()) { 
-        return;
-      }
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-      } else {
+      } else if (!isCompletelyDone()) {
         LOG.info("KILLING " + taskAttemptID);
 
         ContainerManager proxy = null;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1425168&r1=1425167&r2=1425168&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Fri Dec 21 22:34:41 2012
@@ -6,8 +6,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeast;
+import org.mockito.ArgumentCaptor;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -18,15 +22,21 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -272,6 +282,150 @@ public class TestContainerLauncherImpl {
     } finally {
       ut.stop();
       verify(mockCM).stopContainer(any(StopContainerRequest.class));
-}
+    }
+  }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testContainerCleaned() throws Exception {
+    LOG.info("STARTING testContainerCleaned");
+    
+    CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
+    CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
+
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+
+    ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
+      String cmAddress = "127.0.0.1:8000";
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+      
+     
+      LOG.info("inserting launch event");
+      ContainerRemoteLaunchEvent mockLaunchEvent = 
+        mock(ContainerRemoteLaunchEvent.class);
+      when(mockLaunchEvent.getType())
+        .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
+      when(mockLaunchEvent.getContainerID())
+        .thenReturn(contId);
+      when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+      when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+      ut.handle(mockLaunchEvent);
+      
+      startLaunchBarrier.await();
+      
+           
+      LOG.info("inserting cleanup event");
+      ContainerLauncherEvent mockCleanupEvent = 
+        mock(ContainerLauncherEvent.class);
+      when(mockCleanupEvent.getType())
+        .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
+      when(mockCleanupEvent.getContainerID())
+        .thenReturn(contId);
+      when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+      when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+      ut.handle(mockCleanupEvent);
+
+      completeLaunchBarrier.await();
+     
+      ut.waitForPoolToIdle();
+      
+      ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+      verify(mockEventHandler, atLeast(2)).handle(arg.capture());
+      boolean containerCleaned = false;
+      
+      for (int i =0; i < arg.getAllValues().size(); i++) {
+        LOG.info(arg.getAllValues().get(i).toString());
+        Event currentEvent = arg.getAllValues().get(i);
+        if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
+          containerCleaned = true;
+        }
+      }
+      assert(containerCleaned);
+      
+    } finally {
+      ut.stop();
+    }
+  }
+  
+  private static class ContainerManagerForTest implements ContainerManager {
+
+    private CyclicBarrier startLaunchBarrier;
+    private CyclicBarrier completeLaunchBarrier;
+
+    ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) {
+      this.startLaunchBarrier = startLaunchBarrier;
+      this.completeLaunchBarrier = completeLaunchBarrier;
+    }
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      try {
+        startLaunchBarrier.await();
+        completeLaunchBarrier.await();
+        //To ensure the kill is started before the launch
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      } 
+      
+      throw new ContainerException("Force fail CM");
+      
+    }
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+    
+      return null;
+    }
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+    
+      return null;
+    }
   }
+  
+  @SuppressWarnings("serial")
+  private static class ContainerException extends YarnRemoteException {
+
+    public ContainerException(String message) {
+      super(message);
+    }
+
+    @Override
+    public String getRemoteTrace() {
+      return null;
+    }
+
+    @Override
+    public YarnRemoteException getCause() {
+      return null;
+    }
+    
+  }
+  
 }