You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/09/11 02:38:49 UTC

svn commit: r1521699 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...

Author: vinodkv
Date: Wed Sep 11 00:38:49 2013
New Revision: 1521699

URL: http://svn.apache.org/r1521699
Log:
MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else but just before ClientService to avoid race conditions during RM restart. Contributed by Jian He.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Sep 11 00:38:49 2013
@@ -186,6 +186,10 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
     subclass (Sandy Ryza)
 
+    MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else
+    but just before ClientService to avoid race conditions during RM restart.
+    (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Sep 11 00:38:49 2013
@@ -362,7 +362,10 @@ public class MRAppMaster extends Composi
 
       //service to handle requests from JobClient
       clientService = createClientService(context);
-      addIfService(clientService);
+      // Init ClientService separately so that we stop it separately, since this
+      // service needs to wait some time before it stops so clients can know the
+      // final states
+      clientService.init(conf);
       
       containerAllocator = createContainerAllocator(clientService, context);
       
@@ -425,7 +428,6 @@ public class MRAppMaster extends Composi
       // queued inside the JobHistoryEventHandler 
       addIfService(historyService);
     }
-    
     super.serviceInit(conf);
   } // end of init()
   
@@ -534,14 +536,6 @@ public class MRAppMaster extends Composi
       }
     }
 
-    // TODO:currently just wait for some time so clients can know the
-    // final states. Will be removed once RM come on.
-    try {
-      Thread.sleep(5000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
     try {
       //if isLastAMRetry comes as true, should never set it to false
       if ( !isLastAMRetry){
@@ -556,6 +550,14 @@ public class MRAppMaster extends Composi
       LOG.info("Calling stop for all the services");
       MRAppMaster.this.stop();
 
+      // TODO: Stop ClientService last, since only ClientService should wait for
+      // some time so clients can know the final states. Will be removed once RM come on.
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      clientService.stop();
     } catch (Throwable t) {
       LOG.warn("Graceful stop failed ", t);
     }
@@ -1019,8 +1021,10 @@ public class MRAppMaster extends Composi
         LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
             + "job " + job.getID() + ".");
       }
+      // Start ClientService here, since it's not initialized if
+      // errorHappenedShutDown is true
+      clientService.start();
     }
-
     //start all the components
     super.serviceStart();
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java Wed Sep 11 00:38:49 2013
@@ -1,28 +1,30 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.app.client;
 
 import java.net.InetSocketAddress;
 
-public interface ClientService {
+import org.apache.hadoop.service.Service;
 
-  InetSocketAddress getBindAddress();
+public interface ClientService extends Service {
 
-  int getHttpPort();
+  public abstract InetSocketAddress getBindAddress();
+
+  public abstract int getHttpPort();
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Wed Sep 11 00:38:49 2013
@@ -94,8 +94,7 @@ import org.apache.hadoop.yarn.webapp.Web
  * jobclient (user facing).
  *
  */
-public class MRClientService extends AbstractService 
-    implements ClientService {
+public class MRClientService extends AbstractService implements ClientService {
 
   static final Log LOG = LogFactory.getLog(MRClientService.class);
   
@@ -106,7 +105,7 @@ public class MRClientService extends Abs
   private AppContext appContext;
 
   public MRClientService(AppContext appContext) {
-    super("MRClientService");
+    super(MRClientService.class.getName());
     this.appContext = appContext;
     this.protocolHandler = new MRClientProtocolHandler();
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Sep 11 00:38:49 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -603,7 +604,7 @@ public class MRApp extends MRAppMaster {
 
   @Override
   protected ClientService createClientService(AppContext context) {
-    return new ClientService(){
+    return new MRClientService(context) {
       @Override
       public InetSocketAddress getBindAddress() {
         return NetUtils.createSocketAddr("localhost:9876");

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java?rev=1521699&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java Wed Sep 11 00:38:49 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.Test;
+
+public class TestMRAppComponentDependencies {
+
+  @Test(timeout = 20000)
+  public void testComponentStopOrder() throws Exception {
+    @SuppressWarnings("resource")
+    TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true);
+    JobImpl job = (JobImpl) app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    int waitTime = 20 * 1000;
+    while (waitTime > 0 && app.numStops < 2) {
+      Thread.sleep(100);
+      waitTime -= 100;
+    }
+
+    // assert JobHistoryEventHandlerStopped and then clientServiceStopped
+    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
+    Assert.assertEquals(2, app.clientServiceStopped);
+  }
+
+  private final class TestMRApp extends MRApp {
+    int JobHistoryEventHandlerStopped;
+    int clientServiceStopped;
+    int numStops;
+
+    public TestMRApp(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+      JobHistoryEventHandlerStopped = 0;
+      clientServiceStopped = 0;
+      numStops = 0;
+    }
+
+    @Override
+    protected Job createJob(Configuration conf, JobStateInternal forcedState,
+        String diagnostic) {
+      UserGroupInformation currentUser = null;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new YarnRuntimeException(e);
+      }
+      Job newJob =
+          new TestJob(getJobId(), getAttemptID(), conf, getDispatcher()
+            .getEventHandler(), getTaskAttemptListener(), getContext()
+            .getClock(), getCommitter(), isNewApiCommitter(),
+            currentUser.getUserName(), getContext(), forcedState, diagnostic);
+      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+      getDispatcher().register(JobFinishEvent.Type.class,
+        createJobFinishEventHandler());
+
+      return newJob;
+    }
+
+    @Override
+    protected ClientService createClientService(AppContext context) {
+      return new MRClientService(context) {
+        @Override
+        public void serviceStop() throws Exception {
+          numStops++;
+          clientServiceStopped = numStops;
+          super.serviceStop();
+        }
+      };
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new JobHistoryEventHandler(context, getStartCount()) {
+        @Override
+        public void serviceStop() throws Exception {
+          numStops++;
+          JobHistoryEventHandlerStopped = numStops;
+          super.serviceStop();
+        }
+      };
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1521699&r1=1521698&r2=1521699&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Sep 11 00:38:49 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -284,14 +285,12 @@ import org.junit.Test;
   private final class MRAppTestCleanup extends MRApp {
     int stagingDirCleanedup;
     int ContainerAllocatorStopped;
-    int JobHistoryEventHandlerStopped;
     int numStops;
     public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart) {
       super(maps, reduces, autoComplete, testName, cleanOnStart);
       stagingDirCleanedup = 0;
       ContainerAllocatorStopped = 0;
-      JobHistoryEventHandlerStopped = 0;
       numStops = 0;
     }
 
@@ -319,26 +318,6 @@ import org.junit.Test;
     }
 
     @Override
-    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
-        AppContext context) {
-      return new TestJobHistoryEventHandler(context, getStartCount());
-    }
-
-    private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
-
-      public TestJobHistoryEventHandler(AppContext context, int startCount) {
-        super(context, startCount);
-      }
-
-      @Override
-      public void serviceStop() throws Exception {
-        numStops++;
-        JobHistoryEventHandlerStopped = numStops;
-        super.serviceStop();
-      }
-    }
-
-    @Override
     protected ContainerAllocator createContainerAllocator(
         ClientService clientService, AppContext context) {
       return new TestCleanupContainerAllocator();
@@ -405,15 +384,13 @@ import org.junit.Test;
     app.verifyCompleted();
 
     int waitTime = 20 * 1000;
-    while (waitTime > 0 && app.numStops < 3 ) {
+    while (waitTime > 0 && app.numStops < 2) {
       Thread.sleep(100);
       waitTime -= 100;
     }
 
-    // assert JobHistoryEventHandlerStopped first, then
-    // ContainerAllocatorStopped, and then stagingDirCleanedup
-    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
-    Assert.assertEquals(2, app.ContainerAllocatorStopped);
-    Assert.assertEquals(3, app.stagingDirCleanedup);
+    // assert ContainerAllocatorStopped and then tagingDirCleanedup
+    Assert.assertEquals(1, app.ContainerAllocatorStopped);
+    Assert.assertEquals(2, app.stagingDirCleanedup);
   }
  }