You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/11 16:26:18 UTC

svn commit: r1181803 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/...

Author: vinodkv
Date: Tue Oct 11 14:26:17 2011
New Revision: 1181803

URL: http://svn.apache.org/viewvc?rev=1181803&view=rev
Log:
MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish to all the services. Contributed by Thomas Graves.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct 11 14:26:17 2011
@@ -1573,6 +1573,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3020. Fixed TaskAttemptImpl to log the correct node-address for
     a finished Reduce task. (Chackaravarthy via vinodkv)
 
+    MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish
+    to all the services. (Thomas Graves via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue Oct 11 14:26:17 2011
@@ -223,6 +223,7 @@ public class ShuffleHandler extends Abst
   public void stopApp(ApplicationId appId) {
     JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
     secretManager.removeTokenForJob(jobId.toString());
+    userRsrc.remove(jobId.toString());
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Tue Oct 11 14:26:17 2011
@@ -42,8 +42,8 @@ public class AuxServices extends Abstrac
 
   private static final Log LOG = LogFactory.getLog(AuxServices.class);
 
-  public final Map<String,AuxiliaryService> serviceMap;
-  public final Map<String,ByteBuffer> serviceMeta;
+  protected final Map<String,AuxiliaryService> serviceMap;
+  protected final Map<String,ByteBuffer> serviceMeta;
 
   public AuxServices() {
     super(AuxServices.class.getName());
@@ -157,20 +157,24 @@ public class AuxServices extends Abstrac
 
   @Override
   public void handle(AuxServicesEvent event) {
-    LOG.info("Got event " + event.getType() + " for service "
-        + event.getServiceID());
-    AuxiliaryService service = serviceMap.get(event.getServiceID());
-    if (null == service) {
-      // TODO kill all containers waiting on Application
-      return;
-    }
+    LOG.info("Got event " + event.getType() + " for appId "
+        + event.getApplicationID());
     switch (event.getType()) {
     case APPLICATION_INIT:
+      LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
+      AuxiliaryService service = serviceMap.get(event.getServiceID());
+      if (null == service) {
+        LOG.info("service is null");
+        // TODO kill all containers waiting on Application
+        return;
+      }
       service.initApp(event.getUser(), event.getApplicationID(),
           event.getServiceData());
       break;
     case APPLICATION_STOP:
-      service.stopApp(event.getApplicationID());
+      for (AuxiliaryService serv : serviceMap.values()) {
+        serv.stopApp(event.getApplicationID());
+      }
       break;
     default:
       throw new RuntimeException("Unknown type: " + event.getType());

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Oct 11 14:26:17 2011
@@ -28,6 +28,8 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
@@ -247,6 +249,10 @@ public class ApplicationImpl implements 
         new ApplicationLocalizationEvent(
             LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
 
+    // tell any auxiliary services that the app is done 
+    this.dispatcher.getEventHandler().handle(
+        new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
+
     // TODO: Trigger the LogsManager
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Tue Oct 11 14:26:17 2011
@@ -22,8 +22,12 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -39,6 +43,7 @@ import org.apache.hadoop.yarn.service.Se
 import static org.apache.hadoop.yarn.service.Service.STATE.*;
 
 public class TestAuxServices {
+  private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
 
   static class LightService extends AbstractService
       implements AuxServices.AuxiliaryService {
@@ -47,6 +52,7 @@ public class TestAuxServices {
     private int remaining_init;
     private int remaining_stop;
     private ByteBuffer meta = null;
+    private ArrayList<Integer> stoppedApps;
 
     LightService(String name, char idef, int expected_appId) {
       this(name, idef, expected_appId, null);
@@ -56,7 +62,13 @@ public class TestAuxServices {
       this.idef = idef;
       this.expected_appId = expected_appId;
       this.meta = meta;
+      this.stoppedApps = new ArrayList<Integer>();
     }
+
+    public ArrayList<Integer> getAppIdsStopped() {
+      return (ArrayList)this.stoppedApps.clone();
+    }
+
     @Override
     public void init(Configuration conf) {
       remaining_init = conf.getInt(idef + ".expected.init", 0);
@@ -77,7 +89,7 @@ public class TestAuxServices {
     }
     @Override
     public void stopApp(ApplicationId appId) {
-      assertEquals(expected_appId, appId.getId());
+      stoppedApps.add(appId.getId());
     }
     @Override
     public ByteBuffer getMeta() {
@@ -86,11 +98,15 @@ public class TestAuxServices {
   }
 
   static class ServiceA extends LightService {
-    public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
+    public ServiceA() { 
+      super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
+    }
   }
 
   static class ServiceB extends LightService {
-    public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
+    public ServiceB() { 
+      super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
+    }
   }
 
   @Test
@@ -119,6 +135,14 @@ public class TestAuxServices {
     appId.setId(66);
     event = new AuxServicesEvent(
         AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
+    // verify all services got the stop event 
+    aux.handle(event);
+    Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
+    for (AuxServices.AuxiliaryService serv: servs) {
+      ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
+      assertEquals("app not properly stopped", 1, appIds.size());
+      assertTrue("wrong app stopped", appIds.contains((Integer)66));
+    }
   }
 
   @Test

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java?rev=1181803&r1=1181802&r2=1181803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java Tue Oct 11 14:26:17 2011
@@ -166,6 +166,10 @@ public class TestApplication {
           refEq(new ApplicationLocalizationEvent(
               LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
 
+      verify(wa.auxBus).handle(
+          refEq(new AuxServicesEvent(
+              AuxServicesEventType.APPLICATION_STOP, wa.appId)));
+
       wa.appResourcesCleanedup();
       assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());