You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/01/04 20:34:09 UTC

svn commit: r1429049 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...

Author: jlowe
Date: Fri Jan  4 19:34:09 2013
New Revision: 1429049

URL: http://svn.apache.org/viewvc?rev=1429049&view=rev
Log:
svn merge -c 1429040 FIXES: MAPREDUCE-4832. MR AM can get in a split brain situation. Contributed by Jason Lowe

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/
      - copied from r1429040, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jan  4 19:34:09 2013
@@ -61,6 +61,8 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when
     running jobs in local mode (Devaraj K via bobby)
 
+    MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan  4 19:34:09 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl ext
   private AppContext context;
   private Server server;
   protected TaskHeartbeatHandler taskHeartbeatHandler;
+  private RMHeartbeatHandler rmHeartbeatHandler;
+  private long commitWindowMs;
   private InetSocketAddress address;
   private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
     jvmIDToActiveAttemptMap
@@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl ext
   private JobTokenSecretManager jobTokenSecretManager = null;
   
   public TaskAttemptListenerImpl(AppContext context,
-      JobTokenSecretManager jobTokenSecretManager) {
+      JobTokenSecretManager jobTokenSecretManager,
+      RMHeartbeatHandler rmHeartbeatHandler) {
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
+    this.rmHeartbeatHandler = rmHeartbeatHandler;
   }
 
   @Override
   public void init(Configuration conf) {
    registerHeartbeatHandler(conf);
+   commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+       MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
    super.init(conf);
   }
 
@@ -172,6 +179,13 @@ public class TaskAttemptListenerImpl ext
 
     taskHeartbeatHandler.progressing(attemptID);
 
+    // tell task to retry later if AM has not heard from RM within the commit
+    // window to help avoid double-committing in a split-brain situation
+    long now = context.getClock().getTime();
+    if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
+      return false;
+    }
+
     Job job = context.getJob(attemptID.getTaskId().getJobId());
     Task task = job.getTask(attemptID.getTaskId());
     return task.canCommit(attemptID);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan  4 19:34:09 2013
@@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -264,18 +265,20 @@ public class MRAppMaster extends Composi
       addIfService(dispatcher);
     }
 
+    //service to handle requests from JobClient
+    clientService = createClientService(context);
+    addIfService(clientService);
+
+    containerAllocator = createContainerAllocator(clientService, context);
+
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context);
     addIfService(taskAttemptListener);
 
-    //service to do the task cleanup
+    //service to handle the output committer
     committerEventHandler = createCommitterEventHandler(context, committer);
     addIfService(committerEventHandler);
 
-    //service to handle requests from JobClient
-    clientService = createClientService(context);
-    addIfService(clientService);
-
     //service to log job history events
     EventHandler<JobHistoryEvent> historyService = 
         createJobHistoryHandler(context);
@@ -303,7 +306,6 @@ public class MRAppMaster extends Composi
         speculatorEventDispatcher);
 
     // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    containerAllocator = createContainerAllocator(clientService, context);
     addIfService(containerAllocator);
     dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
 
@@ -582,13 +584,15 @@ public class MRAppMaster extends Composi
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
     TaskAttemptListener lis =
-        new TaskAttemptListenerImpl(context, jobTokenSecretManager);
+        new TaskAttemptListenerImpl(context, jobTokenSecretManager,
+            getRMHeartbeatHandler());
     return lis;
   }
 
   protected EventHandler<CommitterEvent> createCommitterEventHandler(
       AppContext context, OutputCommitter committer) {
-    return new CommitterEventHandler(context, committer);
+    return new CommitterEventHandler(context, committer,
+        getRMHeartbeatHandler());
   }
 
   protected ContainerAllocator createContainerAllocator(
@@ -596,6 +600,10 @@ public class MRAppMaster extends Composi
     return new ContainerAllocatorRouter(clientService, context);
   }
 
+  protected RMHeartbeatHandler getRMHeartbeatHandler() {
+    return (RMHeartbeatHandler) containerAllocator;
+  }
+
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) {
     return new ContainerLauncherRouter(context);
@@ -663,7 +671,7 @@ public class MRAppMaster extends Composi
    * happened.
    */
   private final class ContainerAllocatorRouter extends AbstractService
-      implements ContainerAllocator {
+      implements ContainerAllocator, RMHeartbeatHandler {
     private final ClientService clientService;
     private final AppContext context;
     private ContainerAllocator containerAllocator;
@@ -708,6 +716,16 @@ public class MRAppMaster extends Composi
     public void setShouldUnregister(boolean shouldUnregister) {
       ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
     }
+
+    @Override
+    public long getLastHeartbeatTime() {
+      return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
+    }
+
+    @Override
+    public void runOnNextHeartbeat(Runnable callback) {
+      ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
+    }
   }
 
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jan  4 19:34:09 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -54,6 +55,7 @@ public class CommitterEventHandler exten
 
   private final AppContext context;
   private final OutputCommitter committer;
+  private final RMHeartbeatHandler rmHeartbeatHandler;
   private ThreadPoolExecutor launcherPool;
   private Thread eventHandlingThread;
   private BlockingQueue<CommitterEvent> eventQueue =
@@ -61,11 +63,14 @@ public class CommitterEventHandler exten
   private final AtomicBoolean stopped;
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
+  private long commitWindowMs;
 
-  public CommitterEventHandler(AppContext context, OutputCommitter committer) {
+  public CommitterEventHandler(AppContext context, OutputCommitter committer,
+      RMHeartbeatHandler rmHeartbeatHandler) {
     super("CommitterEventHandler");
     this.context = context;
     this.committer = committer;
+    this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.stopped = new AtomicBoolean(false);
   }
 
@@ -75,6 +80,8 @@ public class CommitterEventHandler exten
     commitThreadCancelTimeoutMs = conf.getInt(
         MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
         MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
+    commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
+        MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
   }
 
   @Override
@@ -210,6 +217,7 @@ public class CommitterEventHandler exten
     protected void handleJobCommit(CommitterJobCommitEvent event) {
       try {
         jobCommitStarted();
+        waitForValidCommitWindow();
         committer.commitJob(event.getJobContext());
         context.getEventHandler().handle(
             new JobCommitCompletedEvent(event.getJobID()));
@@ -248,5 +256,26 @@ public class CommitterEventHandler exten
           new TaskAttemptEvent(event.getAttemptID(),
               TaskAttemptEventType.TA_CLEANUP_DONE));
     }
+
+    private synchronized void waitForValidCommitWindow()
+        throws InterruptedException {
+      long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
+      long now = context.getClock().getTime();
+
+      while (now - lastHeartbeatTime > commitWindowMs) {
+        rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() {
+          @Override
+          public void run() {
+            synchronized (EventProcessor.this) {
+              EventProcessor.this.notify();
+            }
+          }
+        });
+
+        wait();
+        lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
+        now = context.getClock().getTime();
+      }
+    }
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jan  4 19:34:09 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -61,7 +62,8 @@ import org.apache.hadoop.yarn.service.Ab
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
-public abstract class RMCommunicator extends AbstractService  {
+public abstract class RMCommunicator extends AbstractService
+    implements RMHeartbeatHandler {
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
@@ -76,6 +78,8 @@ public abstract class RMCommunicator ext
   private Resource minContainerCapability;
   private Resource maxContainerCapability;
   protected Map<ApplicationAccessType, String> applicationACLs;
+  private volatile long lastHeartbeatTime;
+  private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -94,6 +98,7 @@ public abstract class RMCommunicator ext
     this.applicationId = context.getApplicationID();
     this.applicationAttemptId = context.getApplicationAttemptId();
     this.stopped = new AtomicBoolean(false);
+    this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
   }
 
   @Override
@@ -234,8 +239,12 @@ public abstract class RMCommunicator ext
               return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
+              continue;
               // TODO: for other exceptions
             }
+
+            lastHeartbeatTime = context.getClock().getTime();
+            executeHeartbeatCallbacks();
           } catch (InterruptedException e) {
             if (!stopped.get()) {
               LOG.warn("Allocated thread interrupted. Returning.");
@@ -293,6 +302,23 @@ public abstract class RMCommunicator ext
 
   protected abstract void heartbeat() throws Exception;
 
+  private void executeHeartbeatCallbacks() {
+    Runnable callback = null;
+    while ((callback = heartbeatCallbacks.poll()) != null) {
+      callback.run();
+    }
+  }
+
+  @Override
+  public long getLastHeartbeatTime() {
+    return lastHeartbeatTime;
+  }
+
+  @Override
+  public void runOnNextHeartbeat(Runnable callback) {
+    heartbeatCallbacks.add(callback);
+  }
+
   public void setShouldUnregister(boolean shouldUnregister) {
     this.shouldUnregister = shouldUnregister;
     LOG.info("RMCommunicator notified that shouldUnregistered is: " 

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java?rev=1429049&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java Fri Jan  4 19:34:09 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+public interface RMHeartbeatHandler {
+  long getLastHeartbeatTime();
+
+  void runOnNextHeartbeat(Runnable callback);
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan  4 19:34:09 2013
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
@@ -51,8 +55,9 @@ public class TestTaskAttemptListenerImpl
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
+        RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -76,9 +81,12 @@ public class TestTaskAttemptListenerImpl
   public void testGetTask() throws IOException {
     AppContext appCtx = mock(AppContext.class);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     MockTaskAttemptListenerImpl listener = 
-      new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+      new MockTaskAttemptListenerImpl(appCtx, secret,
+          rmHeartbeatHandler, hbHandler);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -152,9 +160,11 @@ public class TestTaskAttemptListenerImpl
     AppContext appCtx = mock(AppContext.class);
     when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -191,4 +201,46 @@ public class TestTaskAttemptListenerImpl
     return tce;
   }
 
+  @Test
+  public void testCommitWindow() throws IOException {
+    SystemClock clock = new SystemClock();
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    when(appCtx.getClock()).thenReturn(clock);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+
+    // verify commit not allowed when RM heartbeat has not occurred recently
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+    boolean canCommit = listener.canCommit(tid);
+    assertFalse(canCommit);
+    verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
+
+    // verify commit allowed when RM heartbeat is recent
+    when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+    canCommit = listener.canCommit(tid);
+    assertTrue(canCommit);
+    verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
+
+    listener.stop();
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan  4 19:34:09 2013
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -489,7 +490,8 @@ public class MRApp extends MRAppMaster {
     return new MRAppContainerAllocator();
   }
 
-  protected class MRAppContainerAllocator implements ContainerAllocator {
+  protected class MRAppContainerAllocator
+      implements ContainerAllocator, RMHeartbeatHandler {
     private int containerCount;
 
      @Override
@@ -514,6 +516,16 @@ public class MRApp extends MRAppMaster {
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
                 container, null));
       }
+
+    @Override
+    public long getLastHeartbeatTime() {
+      return getContext().getClock().getTime();
+    }
+
+    @Override
+    public void runOnNextHeartbeat(Runnable callback) {
+      callback.run();
+    }
   }
 
   @Override
@@ -566,7 +578,8 @@ public class MRApp extends MRAppMaster {
       }
     };
 
-    return new CommitterEventHandler(context, stubbedCommitter);
+    return new CommitterEventHandler(context, stubbedCommitter,
+        getRMHeartbeatHandler());
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Jan  4 19:34:09 2013
@@ -252,7 +252,7 @@ public class TestFail {
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null) {
         @Override
         public void startRpcServer(){};
         @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Jan  4 19:34:09 2013
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.Assert;
 
@@ -68,7 +69,9 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1149,6 +1152,13 @@ public class TestRMContainerAllocator {
       return context;
     }
 
+    private static AppContext createAppContext(
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      AppContext context = createAppContext(appAttemptId, job);
+      when(context.getClock()).thenReturn(clock);
+      return context;
+    }
+
     private static ClientService createMockClientService() {
       ClientService service = mock(ClientService.class);
       when(service.getBindAddress()).thenReturn(
@@ -1173,6 +1183,15 @@ public class TestRMContainerAllocator {
       super.start();
     }
 
+    public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      super(createMockClientService(),
+          createAppContext(appAttemptId, job, clock));
+      this.rm = rm;
+      super.init(conf);
+      super.start();
+    }
+
     @Override
     protected AMRMProtocol createSchedulerProxy() {
       return this.rm.getApplicationMasterService();
@@ -1366,6 +1385,66 @@ public class TestRMContainerAllocator {
         allocator.recalculatedReduceSchedule);
   }
 
+  @Test
+  public void testHeartbeatHandler() throws Exception {
+    LOG.info("Running testHeartbeatHandler");
+
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getApplicationID()).thenReturn(
+        BuilderUtils.newApplicationId(1, 1));
+
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), appContext) {
+          @Override
+          protected void register() {
+          }
+          @Override
+          protected AMRMProtocol createSchedulerProxy() {
+            return mock(AMRMProtocol.class);
+          }
+          @Override
+          protected synchronized void heartbeat() throws Exception {
+          }
+    };
+    allocator.init(conf);
+    allocator.start();
+
+    clock.setTime(5);
+    int timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(5, allocator.getLastHeartbeatTime());
+    clock.setTime(7);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(7, allocator.getLastHeartbeatTime());
+
+    final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+    allocator.runOnNextHeartbeat(new Runnable() {
+      @Override
+      public void run() {
+        callbackCalled.set(true);
+      }
+    });
+    clock.setTime(8);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(8, allocator.getLastHeartbeatTime());
+    Assert.assertTrue(callbackCalled.get());
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Jan  4 19:34:09 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -166,6 +167,11 @@ import org.junit.Test;
      }
 
      @Override
+     public RMHeartbeatHandler getRMHeartbeatHandler() {
+       return getStubbedHeartbeatHandler(getContext());
+     }
+
+     @Override
      protected void sysexit() {      
      }
 
@@ -177,6 +183,7 @@ import org.junit.Test;
      @Override
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
+
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -238,6 +245,11 @@ import org.junit.Test;
     }
 
     @Override
+    public RMHeartbeatHandler getRMHeartbeatHandler() {
+      return getStubbedHeartbeatHandler(getContext());
+    }
+
+    @Override
     public void cleanupStagingDir() throws IOException {
       cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
     }
@@ -247,6 +259,20 @@ import org.junit.Test;
     }
   }
 
+  private static RMHeartbeatHandler getStubbedHeartbeatHandler(
+      final AppContext appContext) {
+    return new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return appContext.getClock().getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
+  }
+
   @Test
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1429049&r1=1429040&r2=1429049&view=diff
==============================================================================
    (empty)

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Jan  4 19:34:09 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -502,13 +503,23 @@ public class TestJobImpl {
 
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
-    SystemClock clock = new SystemClock();
+    final SystemClock clock = new SystemClock();
     AppContext appContext = mock(AppContext.class);
     when(appContext.getEventHandler()).thenReturn(
         dispatcher.getEventHandler());
     when(appContext.getClock()).thenReturn(clock);
+    RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return clock.getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
     CommitterEventHandler handler =
-        new CommitterEventHandler(appContext, committer);
+        new CommitterEventHandler(appContext, committer, heartbeatHandler);
     dispatcher.register(CommitterEventType.class, handler);
     return handler;
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Jan  4 19:34:09 2013
@@ -469,6 +469,16 @@ public interface MRJobConfig {
       60 * 1000;
 
   /**
+   * Defines a time window in milliseconds for output committer operations.
+   * If contact with the RM has occurred within this window then commit
+   * operations are allowed, otherwise the AM will not allow output committer
+   * operations until contact with the RM has been re-established.
+   */
+  public static final String MR_AM_COMMIT_WINDOW_MS =
+      MR_AM_PREFIX + "job.committer.commit-window";
+  public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000;
+
+  /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that
    * is set by the MR framework and read by it too.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1429049&r1=1429048&r2=1429049&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jan  4 19:34:09 2013
@@ -1265,6 +1265,15 @@
 </property>
 
 <property>
+  <name>yarn.app.mapreduce.am.job.committer.commit-window</name>
+  <value>10000</value>
+  <description>Defines a time window in milliseconds for output commit
+  operations.  If contact with the RM has occurred within this window then
+  commits are allowed, otherwise the AM will not allow output commits until
+  contact with the RM has been re-established.</description>
+</property>
+
+<property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>
   <description>The interval in ms at which the MR AppMaster should send