You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/10/28 03:42:21 UTC
svn commit: r1190123 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: mahadev
Date: Fri Oct 28 01:42:21 2011
New Revision: 1190123
URL: http://svn.apache.org/viewvc?rev=1190123&view=rev
Log:
MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down and comes up while job is getting executed. (Eric Payne via mahadev) - Merging r1190122 from trunk
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1190123&r1=1190122&r2=1190123&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 28 01:42:21 2011
@@ -1763,6 +1763,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3282. bin/mapred job -list throws exception. (acmurthy via
mahadev)
+ MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down
+ and comes up while job is getting executed. (Eric Payne via mahadev)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1190123&r1=1190122&r2=1190123&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Oct 28 01:42:21 2011
@@ -23,19 +23,23 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,8 +61,10 @@ public class LocalContainerAllocator ext
LogFactory.getLog(LocalContainerAllocator.class);
private final EventHandler eventHandler;
- private final ApplicationId appID;
+// private final ApplicationId appID;
private AtomicInteger containerCount = new AtomicInteger();
+ private long retryInterval;
+ private long retrystartTime;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -67,7 +73,19 @@ public class LocalContainerAllocator ext
AppContext context) {
super(clientService, context);
this.eventHandler = context.getEventHandler();
- this.appID = context.getApplicationID();
+// this.appID = context.getApplicationID();
+
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ retryInterval =
+ getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ // Init startTime to current time. If all goes well, it will be reset after
+ // first attempt to contact RM.
+ retrystartTime = System.currentTimeMillis();
}
@Override
@@ -77,10 +95,32 @@ public class LocalContainerAllocator ext
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- AMResponse response = allocateResponse.getAMResponse();
+ AMResponse response;
+ try {
+ response = allocateResponse.getAMResponse();
+ // Reset retry count if no exception occurred.
+ retrystartTime = System.currentTimeMillis();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
if (response.getReboot()) {
- // TODO
LOG.info("Event from RM: shutting down Application Master");
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+ this.getContext().getApplicationID());
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1190123&r1=1190122&r2=1190123&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Oct 28 01:42:21 2011
@@ -233,6 +233,9 @@ public abstract class RMCommunicator ext
Thread.sleep(rmPollInterval);
try {
heartbeat();
+ } catch (YarnException e) {
+ LOG.error("Error communicating with RM: " + e.getMessage() , e);
+ return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
// TODO: for other exceptions
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1190123&r1=1190122&r2=1190123&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Oct 28 01:42:21 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -128,6 +129,8 @@ public class RMContainerAllocator extend
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
private float reduceSlowStart = 0;
+ private long retryInterval;
+ private long retrystartTime;
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
@@ -146,6 +149,11 @@ public class RMContainerAllocator extend
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
RackResolver.init(conf);
+ retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ // Init startTime to current time. If all goes well, it will be reset after
+ // first attempt to contact RM.
+ retrystartTime = System.currentTimeMillis();
}
@Override
@@ -429,11 +437,41 @@ public class RMContainerAllocator extend
" rackLocalAssigned:" + rackLocalAssigned +
" availableResources(headroom):" + getAvailableResources();
}
-
+
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
- AMResponse response = makeRemoteRequest();
+ AMResponse response;
+ /*
+ * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+ * milliseconds before aborting. During this interval, AM will still try
+ * to contact the RM.
+ */
+ try {
+ response = makeRemoteRequest();
+ // Reset retry count if no exception occurred.
+ retrystartTime = System.currentTimeMillis();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
+ if (response.getReboot()) {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+ this.getContext().getApplicationID());
+ }
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
List<Container> newContainers = response.getAllocatedContainers();
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1190123&r1=1190122&r2=1190123&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Oct 28 01:42:21 2011
@@ -404,6 +404,15 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
/**
+ * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+ * milliseconds before aborting. During this interval, AM will still try
+ * to contact the RM.
+ */
+ public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+ MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+ public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+ /**
* Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that
* is set by the MR framework and read by it too.