You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/01/04 20:15:22 UTC
svn commit: r1429040 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...
Author: jlowe
Date: Fri Jan 4 19:15:21 2013
New Revision: 1429040
URL: http://svn.apache.org/viewvc?rev=1429040&view=rev
Log:
MAPREDUCE-4832. MR AM can get in a split brain situation. Contributed by Jason Lowe
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 4 19:15:21 2013
@@ -659,6 +659,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when
running jobs in local mode (Devaraj K via bobby)
+ MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan 4 19:15:21 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl ext
private AppContext context;
private Server server;
protected TaskHeartbeatHandler taskHeartbeatHandler;
+ private RMHeartbeatHandler rmHeartbeatHandler;
+ private long commitWindowMs;
private InetSocketAddress address;
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap
@@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl ext
private JobTokenSecretManager jobTokenSecretManager = null;
public TaskAttemptListenerImpl(AppContext context,
- JobTokenSecretManager jobTokenSecretManager) {
+ JobTokenSecretManager jobTokenSecretManager,
+ RMHeartbeatHandler rmHeartbeatHandler) {
super(TaskAttemptListenerImpl.class.getName());
this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager;
+ this.rmHeartbeatHandler = rmHeartbeatHandler;
}
@Override
public void init(Configuration conf) {
registerHeartbeatHandler(conf);
+ commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+ MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
super.init(conf);
}
@@ -172,6 +179,13 @@ public class TaskAttemptListenerImpl ext
taskHeartbeatHandler.progressing(attemptID);
+ // tell task to retry later if AM has not heard from RM within the commit
+ // window to help avoid double-committing in a split-brain situation
+ long now = context.getClock().getTime();
+ if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
+ return false;
+ }
+
Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId());
return task.canCommit(attemptID);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan 4 19:15:21 2013
@@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -264,18 +265,20 @@ public class MRAppMaster extends Composi
addIfService(dispatcher);
}
+ //service to handle requests from JobClient
+ clientService = createClientService(context);
+ addIfService(clientService);
+
+ containerAllocator = createContainerAllocator(clientService, context);
+
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
- //service to do the task cleanup
+ //service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
- //service to handle requests from JobClient
- clientService = createClientService(context);
- addIfService(clientService);
-
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
@@ -303,7 +306,6 @@ public class MRAppMaster extends Composi
speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
- containerAllocator = createContainerAllocator(clientService, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@@ -582,13 +584,15 @@ public class MRAppMaster extends Composi
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
TaskAttemptListener lis =
- new TaskAttemptListenerImpl(context, jobTokenSecretManager);
+ new TaskAttemptListenerImpl(context, jobTokenSecretManager,
+ getRMHeartbeatHandler());
return lis;
}
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
- return new CommitterEventHandler(context, committer);
+ return new CommitterEventHandler(context, committer,
+ getRMHeartbeatHandler());
}
protected ContainerAllocator createContainerAllocator(
@@ -596,6 +600,10 @@ public class MRAppMaster extends Composi
return new ContainerAllocatorRouter(clientService, context);
}
+ protected RMHeartbeatHandler getRMHeartbeatHandler() {
+ return (RMHeartbeatHandler) containerAllocator;
+ }
+
protected ContainerLauncher
createContainerLauncher(final AppContext context) {
return new ContainerLauncherRouter(context);
@@ -663,7 +671,7 @@ public class MRAppMaster extends Composi
* happened.
*/
private final class ContainerAllocatorRouter extends AbstractService
- implements ContainerAllocator {
+ implements ContainerAllocator, RMHeartbeatHandler {
private final ClientService clientService;
private final AppContext context;
private ContainerAllocator containerAllocator;
@@ -708,6 +716,16 @@ public class MRAppMaster extends Composi
public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
}
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
+ }
}
/**
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jan 4 19:15:21 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -54,6 +55,7 @@ public class CommitterEventHandler exten
private final AppContext context;
private final OutputCommitter committer;
+ private final RMHeartbeatHandler rmHeartbeatHandler;
private ThreadPoolExecutor launcherPool;
private Thread eventHandlingThread;
private BlockingQueue<CommitterEvent> eventQueue =
@@ -61,11 +63,14 @@ public class CommitterEventHandler exten
private final AtomicBoolean stopped;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
+ private long commitWindowMs;
- public CommitterEventHandler(AppContext context, OutputCommitter committer) {
+ public CommitterEventHandler(AppContext context, OutputCommitter committer,
+ RMHeartbeatHandler rmHeartbeatHandler) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
+ this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
}
@@ -75,6 +80,8 @@ public class CommitterEventHandler exten
commitThreadCancelTimeoutMs = conf.getInt(
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
+ commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+ MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
}
@Override
@@ -210,6 +217,7 @@ public class CommitterEventHandler exten
protected void handleJobCommit(CommitterJobCommitEvent event) {
try {
jobCommitStarted();
+ waitForValidCommitWindow();
committer.commitJob(event.getJobContext());
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
@@ -248,5 +256,26 @@ public class CommitterEventHandler exten
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
}
+
+ private synchronized void waitForValidCommitWindow()
+ throws InterruptedException {
+ long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
+ long now = context.getClock().getTime();
+
+ while (now - lastHeartbeatTime > commitWindowMs) {
+ rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (EventProcessor.this) {
+ EventProcessor.this.notify();
+ }
+ }
+ });
+
+ wait();
+ lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
+ now = context.getClock().getTime();
+ }
+ }
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jan 4 19:15:21 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -62,7 +63,8 @@ import org.apache.hadoop.yarn.service.Ab
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
-public abstract class RMCommunicator extends AbstractService {
+public abstract class RMCommunicator extends AbstractService
+ implements RMHeartbeatHandler {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis
protected ApplicationId applicationId;
@@ -77,6 +79,8 @@ public abstract class RMCommunicator ext
private Resource minContainerCapability;
private Resource maxContainerCapability;
protected Map<ApplicationAccessType, String> applicationACLs;
+ private volatile long lastHeartbeatTime;
+ private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -95,6 +99,7 @@ public abstract class RMCommunicator ext
this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false);
+ this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
}
@Override
@@ -236,8 +241,12 @@ public abstract class RMCommunicator ext
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
+ continue;
// TODO: for other exceptions
}
+
+ lastHeartbeatTime = context.getClock().getTime();
+ executeHeartbeatCallbacks();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
@@ -295,6 +304,23 @@ public abstract class RMCommunicator ext
protected abstract void heartbeat() throws Exception;
+ private void executeHeartbeatCallbacks() {
+ Runnable callback = null;
+ while ((callback = heartbeatCallbacks.poll()) != null) {
+ callback.run();
+ }
+ }
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return lastHeartbeatTime;
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ heartbeatCallbacks.add(callback);
+ }
+
public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: "
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java?rev=1429040&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java Fri Jan 4 19:15:21 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+public interface RMHeartbeatHandler {
+ long getLastHeartbeatTime();
+
+ void runOnNextHeartbeat(Runnable callback);
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan 4 19:15:21 2013
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
@@ -51,8 +55,9 @@ public class TestTaskAttemptListenerImpl
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
+ RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
- super(context, jobTokenSecretManager);
+ super(context, jobTokenSecretManager, rmHeartbeatHandler);
this.taskHeartbeatHandler = hbHandler;
}
@@ -76,9 +81,12 @@ public class TestTaskAttemptListenerImpl
public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener =
- new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+ new MockTaskAttemptListenerImpl(appCtx, secret,
+ rmHeartbeatHandler, hbHandler);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@@ -152,9 +160,11 @@ public class TestTaskAttemptListenerImpl
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret) {
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -191,4 +201,46 @@ public class TestTaskAttemptListenerImpl
return tce;
}
+ @Test
+ public void testCommitWindow() throws IOException {
+ SystemClock clock = new SystemClock();
+
+ org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+ mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+ when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+ AppContext appCtx = mock(AppContext.class);
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+ when(appCtx.getClock()).thenReturn(clock);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptListenerImpl listener =
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+ @Override
+ protected void registerHeartbeatHandler(Configuration conf) {
+ taskHeartbeatHandler = hbHandler;
+ }
+ };
+
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+
+ // verify commit not allowed when RM heartbeat has not occurred recently
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+ boolean canCommit = listener.canCommit(tid);
+ assertFalse(canCommit);
+ verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
+
+ // verify commit allowed when RM heartbeat is recent
+ when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+ canCommit = listener.canCommit(tid);
+ assertTrue(canCommit);
+ verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
+
+ listener.stop();
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan 4 19:15:21 2013
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@@ -489,7 +490,8 @@ public class MRApp extends MRAppMaster {
return new MRAppContainerAllocator();
}
- protected class MRAppContainerAllocator implements ContainerAllocator {
+ protected class MRAppContainerAllocator
+ implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount;
@Override
@@ -514,6 +516,16 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return getContext().getClock().getTime();
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
}
@Override
@@ -566,7 +578,8 @@ public class MRApp extends MRAppMaster {
}
};
- return new CommitterEventHandler(context, stubbedCommitter);
+ return new CommitterEventHandler(context, stubbedCommitter,
+ getRMHeartbeatHandler());
}
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Jan 4 19:15:21 2013
@@ -252,7 +252,7 @@ public class TestFail {
//task time out is reduced
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
- return new TaskAttemptListenerImpl(getContext(), null) {
+ return new TaskAttemptListenerImpl(getContext(), null, null) {
@Override
public void startRpcServer(){};
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Jan 4 19:15:21 2013
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.ut
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
return context;
}
+ private static AppContext createAppContext(
+ ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+ AppContext context = createAppContext(appAttemptId, job);
+ when(context.getClock()).thenReturn(clock);
+ return context;
+ }
+
private static ClientService createMockClientService() {
ClientService service = mock(ClientService.class);
when(service.getBindAddress()).thenReturn(
@@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
super.start();
}
+ public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+ ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+ super(createMockClientService(),
+ createAppContext(appAttemptId, job, clock));
+ this.rm = rm;
+ super.init(conf);
+ super.start();
+ }
+
@Override
protected AMRMProtocol createSchedulerProxy() {
return this.rm.getApplicationMasterService();
@@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
allocator.recalculatedReduceSchedule);
}
+ @Test
+ public void testHeartbeatHandler() throws Exception {
+ LOG.info("Running testHeartbeatHandler");
+
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
+ ControlledClock clock = new ControlledClock(new SystemClock());
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getClock()).thenReturn(clock);
+ when(appContext.getApplicationID()).thenReturn(
+ BuilderUtils.newApplicationId(1, 1));
+
+ RMContainerAllocator allocator = new RMContainerAllocator(
+ mock(ClientService.class), appContext) {
+ @Override
+ protected void register() {
+ }
+ @Override
+ protected AMRMProtocol createSchedulerProxy() {
+ return mock(AMRMProtocol.class);
+ }
+ @Override
+ protected synchronized void heartbeat() throws Exception {
+ }
+ };
+ allocator.init(conf);
+ allocator.start();
+
+ clock.setTime(5);
+ int timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(5, allocator.getLastHeartbeatTime());
+ clock.setTime(7);
+ timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(7, allocator.getLastHeartbeatTime());
+
+ final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+ allocator.runOnNextHeartbeat(new Runnable() {
+ @Override
+ public void run() {
+ callbackCalled.set(true);
+ }
+ });
+ clock.setTime(8);
+ timeToWaitMs = 5000;
+ while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals(8, allocator.getLastHeartbeatTime());
+ Assert.assertTrue(callbackCalled.get());
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Jan 4 19:15:21 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -166,6 +167,11 @@ import org.junit.Test;
}
@Override
+ public RMHeartbeatHandler getRMHeartbeatHandler() {
+ return getStubbedHeartbeatHandler(getContext());
+ }
+
+ @Override
protected void sysexit() {
}
@@ -177,6 +183,7 @@ import org.junit.Test;
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
+
}
private final class MRAppTestCleanup extends MRApp {
@@ -238,6 +245,11 @@ import org.junit.Test;
}
@Override
+ public RMHeartbeatHandler getRMHeartbeatHandler() {
+ return getStubbedHeartbeatHandler(getContext());
+ }
+
+ @Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
}
@@ -247,6 +259,20 @@ import org.junit.Test;
}
}
+ private static RMHeartbeatHandler getStubbedHeartbeatHandler(
+ final AppContext appContext) {
+ return new RMHeartbeatHandler() {
+ @Override
+ public long getLastHeartbeatTime() {
+ return appContext.getClock().getTime();
+ }
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
+ };
+ }
+
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1429040&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java Fri Jan 4 19:15:21 2013
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+public class TestCommitterEventHandler {
+
+ @Test
+ public void testCommitWindow() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ TestingJobEventHandler jeh = new TestingJobEventHandler();
+ dispatcher.register(JobEventType.class, jeh);
+
+ SystemClock clock = new SystemClock();
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getEventHandler()).thenReturn(
+ dispatcher.getEventHandler());
+ when(appContext.getClock()).thenReturn(clock);
+ OutputCommitter committer = mock(OutputCommitter.class);
+ TestingRMHeartbeatHandler rmhh =
+ new TestingRMHeartbeatHandler();
+
+ CommitterEventHandler ceh = new CommitterEventHandler(appContext,
+ committer, rmhh);
+ ceh.init(conf);
+ ceh.start();
+
+ // verify trying to commit when RM heartbeats are stale does not commit
+ ceh.handle(new CommitterJobCommitEvent(null, null));
+ long timeToWaitMs = 5000;
+ while (rmhh.getNumCallbacks() != 1 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals("committer did not register a heartbeat callback",
+ 1, rmhh.getNumCallbacks());
+ verify(committer, never()).commitJob(any(JobContext.class));
+ Assert.assertEquals("committer should not have committed",
+ 0, jeh.numCommitCompletedEvents);
+
+ // set a fresh heartbeat and verify commit completes
+ rmhh.setLastHeartbeatTime(clock.getTime());
+ timeToWaitMs = 5000;
+ while (jeh.numCommitCompletedEvents != 1 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals("committer did not complete commit after RM hearbeat",
+ 1, jeh.numCommitCompletedEvents);
+ verify(committer, times(1)).commitJob(any(JobContext.class));
+
+ // try to commit again and verify it goes through since the heartbeat
+ // is still fresh
+ ceh.handle(new CommitterJobCommitEvent(null, null));
+ timeToWaitMs = 5000;
+ while (jeh.numCommitCompletedEvents != 2 && timeToWaitMs > 0) {
+ Thread.sleep(10);
+ timeToWaitMs -= 10;
+ }
+ Assert.assertEquals("committer did not commit",
+ 2, jeh.numCommitCompletedEvents);
+ verify(committer, times(2)).commitJob(any(JobContext.class));
+
+ ceh.stop();
+ dispatcher.stop();
+ }
+
+ private static class TestingRMHeartbeatHandler
+ implements RMHeartbeatHandler {
+ private long lastHeartbeatTime = 0;
+ private ConcurrentLinkedQueue<Runnable> callbacks =
+ new ConcurrentLinkedQueue<Runnable>();
+
+ @Override
+ public long getLastHeartbeatTime() {
+ return lastHeartbeatTime;
+ }
+
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callbacks.add(callback);
+ }
+
+ public void setLastHeartbeatTime(long timestamp) {
+ lastHeartbeatTime = timestamp;
+ Runnable callback = null;
+ while ((callback = callbacks.poll()) != null) {
+ callback.run();
+ }
+ }
+
+ public int getNumCallbacks() {
+ return callbacks.size();
+ }
+ }
+
+ private static class TestingJobEventHandler
+ implements EventHandler<JobEvent> {
+ int numCommitCompletedEvents = 0;
+
+ @Override
+ public void handle(JobEvent event) {
+ if (event.getType() == JobEventType.JOB_COMMIT_COMPLETED) {
+ ++numCommitCompletedEvents;
+ }
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Jan 4 19:15:21 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -502,13 +503,23 @@ public class TestJobImpl {
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
- SystemClock clock = new SystemClock();
+ final SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
+ RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
+ @Override
+ public long getLastHeartbeatTime() {
+ return clock.getTime();
+ }
+ @Override
+ public void runOnNextHeartbeat(Runnable callback) {
+ callback.run();
+ }
+ };
CommitterEventHandler handler =
- new CommitterEventHandler(appContext, committer);
+ new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler);
return handler;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Jan 4 19:15:21 2013
@@ -474,6 +474,16 @@ public interface MRJobConfig {
60 * 1000;
/**
+ * Defines a time window in milliseconds for output committer operations.
+ * If contact with the RM has occurred within this window then commit
+ * operations are allowed, otherwise the AM will not allow output committer
+ * operations until contact with the RM has been re-established.
+ */
+ public static final String MR_AM_COMMIT_WINDOW_MS =
+ MR_AM_PREFIX + "job.committer.commit-window";
+ public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000;
+
+ /**
* Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that
* is set by the MR framework and read by it too.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1429040&r1=1429039&r2=1429040&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jan 4 19:15:21 2013
@@ -881,6 +881,15 @@
</property>
<property>
+ <name>yarn.app.mapreduce.am.job.committer.commit-window</name>
+ <value>10000</value>
+ <description>Defines a time window in milliseconds for output commit
+ operations. If contact with the RM has occurred within this window then
+ commits are allowed, otherwise the AM will not allow output commits until
+ contact with the RM has been re-established.</description>
+</property>
+
+<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>
<description>The interval in ms at which the MR AppMaster should send