You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 21:24:37 UTC

tez git commit: TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be setup explicitly. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master ea46f459c -> 4974fb235


TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be
setup explicitly. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4974fb23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4974fb23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4974fb23

Branch: refs/heads/master
Commit: 4974fb235413fc67fbe48dc29cb651c4b0fe7ce3
Parents: ea46f45
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 16 12:24:23 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 16 12:24:23 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../apache/tez/runtime/api/ExecutionContext.java  |  2 +-
 .../org/apache/tez/runtime/api/TaskContext.java   |  2 +-
 .../dag/app/launcher/LocalContainerLauncher.java  |  8 +++++---
 .../apache/tez/mapreduce/processor/MapUtils.java  |  3 ++-
 .../processor/reduce/TestReduceProcessor.java     |  3 ++-
 tez-runtime-internals/findbugs-exclude.xml        |  7 +++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java    | 11 +++++++----
 .../tez/runtime/api/impl/TezInputContextImpl.java |  4 ++--
 .../runtime/api/impl/TezOutputContextImpl.java    |  4 ++--
 .../runtime/api/impl/TezProcessorContextImpl.java |  4 ++--
 .../tez/runtime/api/impl/TezTaskContextImpl.java  |  6 ++++--
 .../org/apache/tez/runtime/task/TaskReporter.java | 11 ++++++-----
 .../org/apache/tez/runtime/task/TezChild.java     | 18 ++++++++++--------
 .../apache/tez/runtime/task/TezTaskRunner.java    |  4 ++--
 .../TestLogicalIOProcessorRuntimeTask.java        |  4 ++--
 .../tez/runtime/task/TestTaskExecution.java       |  2 +-
 .../output/TestOnFileUnorderedKVOutput.java       |  2 +-
 18 files changed, 58 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4c60e..eec52dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be setup explicitly.
   TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode.
   TEZ-1661. LocalTaskScheduler hangs when shutdown.
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
index b120ecc..fa9a47f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
@@ -17,7 +17,7 @@ package org.apache.tez.runtime.api;
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
- * Execution context for a running task
+ * The context for the executor within which a task runs. May be shared between tasks
  *
  * This interface is not meant to be implemented by users
  */

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 42918b9..8e22057 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -182,7 +182,7 @@ public interface TaskContext {
   public int getVertexParallelism();
 
   /**
-   * Get the execution context for a running task
+   * Get the context for the executor. This may be shared across multiple tasks
    * @return the execution context
    */
   public ExecutionContext getExecutionContext();

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 2f29569..9a83d3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -89,7 +89,7 @@ public class LocalContainerLauncher extends AbstractService implements
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
   private final Map<String, String> localEnv = new HashMap<String, String>();
-  private final ExecutionContext ExecutionContext;
+  private final ExecutionContext executionContext;
 
   private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
       runningContainers =
@@ -116,7 +116,7 @@ public class LocalContainerLauncher extends AbstractService implements
     this.workingDirectory = workingDirectory;
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
-    ExecutionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
+    executionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
     // User cannot be set here since it isn't available till a DAG is running.
   }
 
@@ -328,9 +328,11 @@ public class LocalContainerLauncher extends AbstractService implements
     containerEnv.putAll(localEnv);
     containerEnv.put(Environment.USER.name(), context.getUser());
 
+    // TODO TEZ-1482. Control the memory available based on number of executors
     TezChild tezChild =
         TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
-            attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext, credentials);
+            attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
+            Runtime.getRuntime().maxMemory());
     tezChild.setUmbilical(tezTaskUmbilicalProtocol);
     return tezChild;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8ec17eb..40bea0f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -231,7 +231,8 @@ public class MapUtils {
         umbilical,
         serviceConsumerMetadata,
         envMap,
-        HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"));
+        HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+        Runtime.getRuntime().maxMemory());
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index b378f37..b205a42 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -223,7 +223,8 @@ public class TestReduceProcessor {
         new TestUmbilical(),
         serviceConsumerMetadata,
         serviceProviderEnvMap,
-        HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"));
+        HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+        Runtime.getRuntime().maxMemory());
 
     List<Event> destEvents = new LinkedList<Event>();
     destEvents.add(dme);

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/findbugs-exclude.xml b/tez-runtime-internals/findbugs-exclude.xml
index bc8a2c0..2b308de 100644
--- a/tez-runtime-internals/findbugs-exclude.xml
+++ b/tez-runtime-internals/findbugs-exclude.xml
@@ -33,6 +33,13 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.tez.runtime.task.TezChild"/>
+    <Method name="&lt;init&gt;"/>
+    <Field name="localDirs"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
     <Class name="~org\.apache\.tez\.runtime\.internals\.api\.events\.SystemEventProtos\$.*Proto" />
     <Field name="PARSER"/>
     <Bug pattern="MS_SHOULD_BE_FINAL"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index c3f90a5..9d9be37 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -133,12 +133,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   
   private final ObjectRegistry objectRegistry;
   private final ExecutionContext ExecutionContext;
+  private final long memAvailable;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
       Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
-      String pid, ExecutionContext ExecutionContext) throws IOException {
+      String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException {
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical, pid);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -176,6 +177,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.inputReadyTracker = new InputReadyTracker();
     this.objectRegistry = objectRegistry;
     this.ExecutionContext = ExecutionContext;
+    this.memAvailable = memAvailable;
   }
 
   /**
@@ -488,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry,
-        ExecutionContext);
+        ExecutionContext, memAvailable);
     return inputContext;
   }
 
@@ -502,7 +504,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         tezCounters, outputIndex,
         outputSpec.getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
-        outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext);
+        outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext,
+        memAvailable);
     return outputContext;
   }
 
@@ -514,7 +517,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         taskSpec.getTaskAttemptID(),
         tezCounters, processorDescriptor.getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
-        processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext);
+        processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 460efbf..a15e072 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -68,12 +68,12 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              Map<String, String> auxServiceEnv, MemoryDistributor memDist,
                              InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
                              InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
-                             ExecutionContext ExecutionContext) {
+                             ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
         vertexParallelism, taskAttemptID, wrapCounters(counters,
         taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical,
         serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor,
-        objectRegistry, ExecutionContext);
+        objectRegistry, ExecutionContext, memAvailable);
     checkNotNull(inputIndex, "inputIndex is null");
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(inputs, "input map is null");

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 88e87ce..d376b88 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -63,12 +63,12 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext) {
+      ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, 
         vertexParallelism, taskAttemptID,
         wrapCounters(counters, taskVertexName, destinationVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext);
+        auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
     this.userPayload = userPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 3317c80..16f9a45 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -59,10 +59,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext) {
+      ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext);
+        auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index d9de7b5..527b822 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -64,6 +64,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
   private final ObjectRegistry objectRegistry;
   private final int vertexParallelism;
   private final ExecutionContext ExecutionContext;
+  private final long memAvailable;
 
   @Private
   public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
@@ -72,7 +73,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry,
-      ExecutionContext ExecutionContext) {
+      ExecutionContext ExecutionContext, long memAvailable) {
     checkNotNull(conf, "conf is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(taskVertexName, "taskVertexName is null");
@@ -103,6 +104,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
     this.objectRegistry = objectRegistry;
     this.vertexParallelism = vertexParallelism;
     this.ExecutionContext = ExecutionContext;
+    this.memAvailable = memAvailable;
   }
 
   @Override
@@ -198,7 +200,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
 
   @Override
   public long getTotalMemoryAvailableToTask() {
-    return Runtime.getRuntime().maxMemory();
+    return memAvailable;
   }
   
   protected void signalFatalError(Throwable t, String message, EventMetaData sourceInfo) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 15dcbb0..0841984 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -142,7 +143,7 @@ public class TaskReporter {
      * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
      * log counters.
      */
-    private int nonOobHeartbeatCounter = 0;
+    private AtomicInteger nonOobHeartbeatCounter = new AtomicInteger(0);
     private int nextHeartbeatNumToLog = 0;
     /*
      * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. 
@@ -186,7 +187,7 @@ public class TaskReporter {
             try {
               boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
               if (!interrupted) {
-                nonOobHeartbeatCounter++;
+                nonOobHeartbeatCounter.incrementAndGet();
               }
             } finally {
               lock.unlock();
@@ -228,9 +229,9 @@ public class TaskReporter {
          * real time decisions are made based on these counters, it can be sent once per second.
          */
         // Not completely accurate, since OOB heartbeats could go out.
-        if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+        if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
           counters = task.getCounters();
-          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
         }
         updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
             updateEventMetadata);
@@ -294,7 +295,7 @@ public class TaskReporter {
 
     private void maybeLogCounters() {
       if (LOG.isDebugEnabled()) {
-        if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+        if (nonOobHeartbeatCounter.get() == nextHeartbeatNumToLog) {
           LOG.debug("Counters: " + task.getCounters().toShortString());
           nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7a9b600..bfac476 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -99,10 +99,11 @@ public class TezChild {
   private final ListeningExecutorService executor;
   private final ObjectRegistryImpl objectRegistry;
   private final String pid;
-  private final ExecutionContext ExecutionContext;
+  private final ExecutionContext executionContext;
   private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
   private final Map<String, String> serviceProviderEnvMap;
   private final Credentials credentials;
+  private final long memAvailable;
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -116,8 +117,8 @@ public class TezChild {
       String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
       Map<String, String> serviceProviderEnvMap,
       ObjectRegistryImpl objectRegistry, String pid,
-      ExecutionContext ExecutionContext,
-      Credentials credentials)
+      ExecutionContext executionContext,
+      Credentials credentials, long memAvailable)
       throws IOException, InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
@@ -126,8 +127,9 @@ public class TezChild {
     this.serviceProviderEnvMap = serviceProviderEnvMap;
     this.workingDir = workingDir;
     this.pid = pid;
-    this.ExecutionContext = ExecutionContext;
+    this.executionContext = executionContext;
     this.credentials = credentials;
+    this.memAvailable = memAvailable;
 
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -233,7 +235,7 @@ public class TezChild {
         TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
             localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
             serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
-            executor, objectRegistry, pid, this.ExecutionContext);
+            executor, objectRegistry, pid, executionContext, memAvailable);
         boolean shouldDie;
         try {
           shouldDie = !taskRunner.run();
@@ -409,7 +411,7 @@ public class TezChild {
   public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
       String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
       Map<String, String> serviceProviderEnvMap, @Nullable String pid,
-      ExecutionContext ExecutionContext, Credentials credentials)
+      ExecutionContext executionContext, Credentials credentials, long memAvailable)
       throws IOException, InterruptedException, TezException {
 
     // Pull in configuration specified for the session.
@@ -426,7 +428,7 @@ public class TezChild {
 
     return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
-        ExecutionContext, credentials);
+        executionContext, credentials, memAvailable);
   }
 
   public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -459,7 +461,7 @@ public class TezChild {
     TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
         tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
         System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
-        credentials);
+        credentials, Runtime.getRuntime().maxMemory());
     tezChild.run();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 6e655f9..6606481 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -72,7 +72,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
       Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
       ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
-      ExecutionContext ExecutionContext)
+      ExecutionContext executionContext, long memAvailable)
           throws IOException {
     this.tezConf = tezConf;
     this.ugi = ugi;
@@ -80,7 +80,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     this.executor = executor;
     task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
         serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
-        ExecutionContext);
+        executionContext, memAvailable);
     taskReporter.registerTask(task, this);
     taskRunning = new AtomicBoolean(true);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 60f8d46..4d165b5 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -78,7 +78,7 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
-        "", new ExecutionContextImpl("localhost"));
+        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
 
     lio1.initialize();
     lio1.run();
@@ -96,7 +96,7 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
-        "", new ExecutionContextImpl("localhost"));
+        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
     lio2.initialize();
     lio2.run();
     lio2.close();

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 4f94cfe..7890f89 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -717,7 +717,7 @@ public class TestTaskExecution {
 
     TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
         new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
-        executor, null, "", new ExecutionContextImpl("localhost"));
+        executor, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
     return taskRunner;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 91d558f..1f78cbd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -126,7 +126,7 @@ public class TestOnFileUnorderedKVOutput {
         appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
         -1, taskAttemptID, counters, 0, userPayload, runtimeTask,
         null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
-        new ExecutionContextImpl("localhost"));
+        new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
 
     UnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1);