You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/13 01:02:26 UTC

git commit: TEZ-832. Fix a race in MemoryDistributor. (sseth)

Updated Branches:
  refs/heads/master 8d265ed46 -> c50152c71


TEZ-832. Fix a race in MemoryDistributor. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c50152c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c50152c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c50152c7

Branch: refs/heads/master
Commit: c50152c718b8cac08642004a3f6378c0f53b75c5
Parents: 8d265ed
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 16:01:49 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 16:01:49 2014 -0800

----------------------------------------------------------------------
 .../common/resources/MemoryDistributor.java      | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c50152c7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index df71447..51d2de8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,11 +55,11 @@ public class MemoryDistributor {
   private final int numTotalInputs;
   private final int numTotalOutputs;
   
-  private int numInputsSeen = 0;
-  private int numOutputsSeen = 0;
+  private AtomicInteger numInputsSeen = new AtomicInteger(0);
+  private AtomicInteger numOutputsSeen = new AtomicInteger(0);
 
   private long totalJvmMemory;
-  private long totalAssignableMemory;
+  private volatile long totalAssignableMemory;
   private final boolean isEnabled;
   private final boolean reserveFractionConfigured;
   private float reserveFraction;
@@ -119,8 +120,8 @@ public class MemoryDistributor {
    * have made their initial requests.
    */
   public void makeInitialAllocations() {
-    Preconditions.checkState(numInputsSeen == numTotalInputs, "All inputs are expected to ask for memory");
-    Preconditions.checkState(numOutputsSeen == numTotalOutputs, "All outputs are expected to ask for memory");
+    Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory");
+    Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory");
     Iterable<RequestContext> requestContexts = Iterables.transform(requestList,
         new Function<RequestorInfo, RequestContext>() {
           public RequestContext apply(RequestorInfo requestInfo) {
@@ -185,14 +186,14 @@ public class MemoryDistributor {
     RequestorInfo requestInfo = new RequestorInfo(entityContext,requestSize, callback, descriptor);
     switch (requestInfo.getRequestContext().getComponentType()) {
     case INPUT:
-      numInputsSeen++;
-      Preconditions.checkState(numInputsSeen <= numTotalInputs,
+      numInputsSeen.incrementAndGet();
+      Preconditions.checkState(numInputsSeen.get() <= numTotalInputs,
           "Num Requesting Inputs higher than total # of inputs: " + numInputsSeen + ", "
               + numTotalInputs);
       break;
     case OUTPUT:
-      numOutputsSeen++;
-      Preconditions.checkState(numOutputsSeen <= numTotalOutputs,
+      numOutputsSeen.incrementAndGet();
+      Preconditions.checkState(numOutputsSeen.get() <= numTotalOutputs,
           "Num Requesting Inputs higher than total # of outputs: " + numOutputsSeen + ", "
               + numTotalOutputs);
     case PROCESSOR: