You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/13 01:02:26 UTC
git commit: TEZ-832. Fix a race in MemoryDistributor. (sseth)
Updated Branches:
refs/heads/master 8d265ed46 -> c50152c71
TEZ-832. Fix a race in MemoryDistributor. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c50152c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c50152c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c50152c7
Branch: refs/heads/master
Commit: c50152c718b8cac08642004a3f6378c0f53b75c5
Parents: 8d265ed
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 16:01:49 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 16:01:49 2014 -0800
----------------------------------------------------------------------
.../common/resources/MemoryDistributor.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c50152c7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index df71447..51d2de8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,11 +55,11 @@ public class MemoryDistributor {
private final int numTotalInputs;
private final int numTotalOutputs;
- private int numInputsSeen = 0;
- private int numOutputsSeen = 0;
+ private AtomicInteger numInputsSeen = new AtomicInteger(0);
+ private AtomicInteger numOutputsSeen = new AtomicInteger(0);
private long totalJvmMemory;
- private long totalAssignableMemory;
+ private volatile long totalAssignableMemory;
private final boolean isEnabled;
private final boolean reserveFractionConfigured;
private float reserveFraction;
@@ -119,8 +120,8 @@ public class MemoryDistributor {
* have made their initial requests.
*/
public void makeInitialAllocations() {
- Preconditions.checkState(numInputsSeen == numTotalInputs, "All inputs are expected to ask for memory");
- Preconditions.checkState(numOutputsSeen == numTotalOutputs, "All outputs are expected to ask for memory");
+ Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory");
+ Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory");
Iterable<RequestContext> requestContexts = Iterables.transform(requestList,
new Function<RequestorInfo, RequestContext>() {
public RequestContext apply(RequestorInfo requestInfo) {
@@ -185,14 +186,14 @@ public class MemoryDistributor {
RequestorInfo requestInfo = new RequestorInfo(entityContext,requestSize, callback, descriptor);
switch (requestInfo.getRequestContext().getComponentType()) {
case INPUT:
- numInputsSeen++;
- Preconditions.checkState(numInputsSeen <= numTotalInputs,
+ numInputsSeen.incrementAndGet();
+ Preconditions.checkState(numInputsSeen.get() <= numTotalInputs,
"Num Requesting Inputs higher than total # of inputs: " + numInputsSeen + ", "
+ numTotalInputs);
break;
case OUTPUT:
- numOutputsSeen++;
- Preconditions.checkState(numOutputsSeen <= numTotalOutputs,
+ numOutputsSeen.incrementAndGet();
+ Preconditions.checkState(numOutputsSeen.get() <= numTotalOutputs,
"Num Requesting Inputs higher than total # of outputs: " + numOutputsSeen + ", "
+ numTotalOutputs);
case PROCESSOR: