You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/10/28 03:40:25 UTC

svn commit: r1190122 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/or...

Author: mahadev
Date: Fri Oct 28 01:40:24 2011
New Revision: 1190122

URL: http://svn.apache.org/viewvc?rev=1190122&view=rev
Log:
MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down and comes up while job is getting executed. (Eric Payne via mahadev)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1190122&r1=1190121&r2=1190122&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 28 01:40:24 2011
@@ -1824,6 +1824,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3282. bin/mapred job -list throws exception. (acmurthy via 
     mahadev)
 
+    MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down 
+    and comes up while job is getting executed. (Eric Payne via mahadev)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1190122&r1=1190121&r2=1190122&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Oct 28 01:40:24 2011
@@ -23,19 +23,23 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,8 +61,10 @@ public class LocalContainerAllocator ext
       LogFactory.getLog(LocalContainerAllocator.class);
 
   private final EventHandler eventHandler;
-  private final ApplicationId appID;
+//  private final ApplicationId appID;
   private AtomicInteger containerCount = new AtomicInteger();
+  private long retryInterval;
+  private long retrystartTime;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -67,7 +73,19 @@ public class LocalContainerAllocator ext
                                  AppContext context) {
     super(clientService, context);
     this.eventHandler = context.getEventHandler();
-    this.appID = context.getApplicationID();
+//    this.appID = context.getApplicationID();
+    
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retryInterval =
+        getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -77,10 +95,32 @@ public class LocalContainerAllocator ext
             .getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
+    AMResponse response;
+    try {
+      response = allocateResponse.getAMResponse();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
     if (response.getReboot()) {
-      // TODO
       LOG.info("Event from RM: shutting down Application Master");
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1190122&r1=1190121&r2=1190122&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Oct 28 01:40:24 2011
@@ -233,6 +233,9 @@ public abstract class RMCommunicator ext
             Thread.sleep(rmPollInterval);
             try {
               heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               // TODO: for other exceptions

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1190122&r1=1190121&r2=1190122&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Oct 28 01:40:24 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -128,6 +129,8 @@ public class RMContainerAllocator extend
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
   private float reduceSlowStart = 0;
+  private long retryInterval;
+  private long retrystartTime;
   
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
@@ -146,6 +149,11 @@ public class RMContainerAllocator extend
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
     RackResolver.init(conf);
+    retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+                                MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -429,11 +437,41 @@ public class RMContainerAllocator extend
         " rackLocalAssigned:" + rackLocalAssigned +
         " availableResources(headroom):" + getAvailableResources();
   }
-  
+
   @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
-    AMResponse response = makeRemoteRequest();
+    AMResponse response;
+    /*
+     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+     * milliseconds before aborting. During this interval, AM will still try
+     * to contact the RM.
+     */
+    try {
+      response = makeRemoteRequest();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
+    }
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
     List<Container> newContainers = response.getAllocatedContainers();
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1190122&r1=1190121&r2=1190122&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Oct 28 01:40:24 2011
@@ -404,6 +404,15 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
 
   /**
+   * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+   * milliseconds before aborting. During this interval, AM will still try
+   * to contact the RM.
+   */
+  public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+  /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that
    * is set by the MR framework and read by it too.