You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:47:45 UTC
[04/50] [abbrv] tez git commit: TEZ-3244. Allow overlap of input and
output memory when they are not concurrent. (jlowe)
TEZ-3244. Allow overlap of input and output memory when they are not concurrent. (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63ae97d5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63ae97d5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63ae97d5
Branch: refs/heads/TEZ-1190
Commit: 63ae97d5f3fe6e30e3c5f7c9a892ef9902e83b39
Parents: b3a3af3
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Tue Feb 7 13:32:37 2017 -0600
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Tue Feb 7 13:32:37 2017 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 30 ++++
.../common/resources/MemoryDistributor.java | 12 +-
.../WeightedScalingMemoryDistributor.java | 62 ++++++-
.../TestWeightedScalingMemoryDistributor.java | 165 +++++++++++++++++++
5 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16e239f..a7cc0ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3244. Allow overlap of input and output memory when they are not concurrent
TEZ-3581. Add different logger to enable suppressing logs for specific lines.
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
TEZ-3600. Fix flaky test: TestTokenCache
@@ -197,6 +198,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3244. Allow overlap of input and output memory when they are not concurrent
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
TEZ-3574. Container reuse won't pickup extra dag level local resource.
http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index fd71b35..94f40bb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -875,6 +875,36 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
TEZ_TASK_PREFIX + "scale.memory.ratios";
+ /**
+ * Concurrent input/output memory allocation control. When enabled memory
+ * distributions assume that inputs and outputs will use their memory
+ * simultaneously. When disabled the distributions assume that outputs are not
+ * initialized until inputs release memory buffers, allowing inputs to
+ * leverage memory normally set aside for outputs and vice-versa.
+ * NOTE: This property currently is not supported by the ScalingAllocator
+ * memory distributor.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT =
+ TEZ_TASK_PREFIX + "scale.memory.input-output-concurrent";
+ public static final boolean TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT = true;
+
+ /**
+ * Controls distributing output memory to inputs when non-concurrent I/O
+ * memory allocation is being used. When enabled inputs will receive the
+ * same memory allocation as if concurrent I/O memory allocation were used.
+ * NOTE: This property currently is not supported by the ScalingAllocator
+ * memory distributor.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED =
+ TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled";
+ public static final boolean TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT = false;
+
@Private
@Unstable
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c822357..e63a414 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -61,6 +61,7 @@ public class MemoryDistributor {
private long totalJvmMemory;
private final boolean isEnabled;
+ private final boolean isInputOutputConcurrent;
private final String allocatorClassName;
private final Set<TaskContext> dupSet = Collections
.newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>());
@@ -78,6 +79,9 @@ public class MemoryDistributor {
this.conf = conf;
isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
+ isInputOutputConcurrent = conf.getBoolean(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT);
if (isEnabled) {
allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
@@ -213,9 +217,11 @@ public class MemoryDistributor {
Preconditions.checkState(numAllocations == numRequestors,
"Number of allocations must match number of requestors. Allocated=" + numAllocations
+ ", Requests: " + numRequestors);
- Preconditions.checkState(totalAllocated <= totalJvmMemory,
- "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated
- + ", totalJvmMemory: " + totalJvmMemory);
+ if (isInputOutputConcurrent) {
+ Preconditions.checkState(totalAllocated <= totalJvmMemory,
+ "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated
+ + ", totalJvmMemory: " + totalJvmMemory);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index 8477300..c5b4fb0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.common.resources.InitialMemoryAllocator;
import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext;
+import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext.ComponentType;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
@@ -129,9 +130,15 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
+ availableForAllocation + ", TotalRequested/TotalJVMHeap:"
+ new DecimalFormat("0.00").format(ratio));
+ int numInputRequestsScaled = 0;
+ int numOutputRequestsScaled = 0;
+ long totalInputAllocated = 0;
+ long totalOutputAllocated = 0;
+
// Actual scaling
List<Long> allocations = Lists.newArrayListWithCapacity(numRequests);
for (Request request : requests) {
+ long allocated = 0;
if (request.requestSize == 0) {
allocations.add(0l);
if (LOG.isDebugEnabled()) {
@@ -141,7 +148,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
} else {
double requestFactor = request.requestWeight / (double) numRequestsScaled;
double scaledRequest = requestFactor * request.requestSize;
- long allocated = Math.min(
+ allocated = Math.min(
(long) ((scaledRequest / totalScaledRequest) * availableForAllocation),
request.requestSize);
// TODO Later - If requestedSize is used, the difference (allocated -
@@ -152,9 +159,52 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
+ request.requestType + " " + request.requestSize + " to allocated: " + allocated);
}
}
+
+ if (request.componentType == ComponentType.INPUT) {
+ numInputRequestsScaled += request.requestWeight;
+ totalInputAllocated += allocated;
+ } else if (request.componentType == ComponentType.OUTPUT) {
+ numOutputRequestsScaled += request.requestWeight;
+ totalOutputAllocated += allocated;
+ }
+ }
+
+ if (!conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT)) {
+ adjustAllocationsForNonConcurrent(allocations, requests,
+ numInputRequestsScaled, totalInputAllocated,
+ numOutputRequestsScaled, totalOutputAllocated);
}
+
return allocations;
+ }
+ private void adjustAllocationsForNonConcurrent(List<Long> allocations,
+ List<Request> requests, int numInputsScaled, long totalInputAllocated,
+ int numOutputsScaled, long totalOutputAllocated) {
+ boolean inputsEnabled = conf.getBoolean(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT);
+ LOG.info("Adjusting scaled allocations for I/O non-concurrent."
+ + " numInputsScaled: {} InputAllocated: {} numOutputsScaled: {} outputAllocated: {} inputsEnabled: {}",
+ numInputsScaled, totalInputAllocated, numOutputsScaled, totalOutputAllocated, inputsEnabled);
+ for (int i = 0; i < requests.size(); i++) {
+ Request request = requests.get(i);
+ long additional = 0;
+ if (request.componentType == ComponentType.INPUT && inputsEnabled) {
+ double share = request.requestWeight / (double)numInputsScaled;
+ additional = (long) (totalOutputAllocated * share);
+ } else if (request.componentType == ComponentType.OUTPUT) {
+ double share = request.requestWeight / (double)numOutputsScaled;
+ additional = (long) (totalInputAllocated * share);
+ }
+ if (additional > 0) {
+ long newTotal = Math.min(allocations.get(i) + additional, request.requestSize);
+ // TODO Later - If requestedSize is used, the difference could be allocated to others.
+ allocations.set(i, newTotal);
+ LOG.debug("Adding {} to {} total={}", additional, request.componentClassname, newTotal);
+ }
+ }
}
private void initialProcessMemoryRequestContext(InitialMemoryRequestContext context) {
@@ -164,9 +214,10 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
String className = context.getComponentClassName();
requestType = getRequestTypeForClass(className);
Integer typeScaleFactor = getScaleFactorForType(requestType);
+ ComponentType componentType = context.getComponentType();
- Request request = new Request(context.getComponentClassName(), context.getRequestedSize(),
- requestType, typeScaleFactor);
+ Request request = new Request(context.getComponentClassName(), componentType,
+ context.getRequestedSize(), requestType, typeScaleFactor);
requests.add(request);
numRequestsScaled += typeScaleFactor;
}
@@ -293,14 +344,17 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
}
private static class Request {
- Request(String componentClassname, long requestSize, RequestType requestType, int requestWeight) {
+ Request(String componentClassname, ComponentType componentType, long requestSize,
+ RequestType requestType, int requestWeight) {
this.componentClassname = componentClassname;
+ this.componentType = componentType;
this.requestSize = requestSize;
this.requestType = requestType;
this.requestWeight = requestWeight;
}
String componentClassname;
+ ComponentType componentType;
long requestSize;
private RequestType requestType;
private int requestWeight;
http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index a38497c..2fbe264 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.LogicalInput;
@@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
@@ -145,6 +147,169 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
assertEquals(1500, e4Callback.assigned);
}
+ @Test(timeout = 5000)
+ public void testWeightedScalingNonConcurrent() throws TezException {
+ Configuration conf = new Configuration(this.conf);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, true);
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
+ System.err.println(Joiner.on(",").join(conf.getStringCollection(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput)
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ // Fifth request - Processor
+ MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+ ProcessorContext e5ProcContext = createTestProcessortContext();
+ ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+ dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 80% of 10K = 8000
+ // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+ // Overlap input and output memory
+ assertEquals(5250, e1Callback.assigned);
+ assertEquals(1750, e2Callback.assigned);
+ assertEquals(2333, e3Callback.assigned);
+ assertEquals(4666, e4Callback.assigned);
+ assertEquals(1000, e5Callback.assigned);
+ }
+
+ @Test(timeout = 5000)
+ public void testAdditionalReserveFractionWeightedScalingNonConcurrent() throws TezException {
+ Configuration conf = new Configuration(this.conf);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, true);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1));
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO, 0.025d);
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX, 0.2d);
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput [weight 6]
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput [weight 2]
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput) [weight 1]
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput [weight 3]
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 60% of 10K = 6000
+ // 4 requests (weight) - 10K (6), 10K(2), 10K(1), 10K(3)
+ // Overlap input and output memory
+ assertEquals(4500, e1Callback.assigned);
+ assertEquals(1500, e2Callback.assigned);
+ assertEquals(1500, e3Callback.assigned);
+ assertEquals(4500, e4Callback.assigned);
+ }
+
+ @Test(timeout = 5000)
+ public void testWeightedScalingNonConcurrentInputsDisabled() throws TezException {
+ Configuration conf = new Configuration(this.conf);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+ conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, false);
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
+ System.err.println(Joiner.on(",").join(conf.getStringCollection(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput)
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ // Fifth request - Processor
+ MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+ ProcessorContext e5ProcContext = createTestProcessortContext();
+ ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+ dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 80% of 10K = 8000
+ // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+ // Overlap input and output memory
+ assertEquals(3000, e1Callback.assigned);
+ assertEquals(1000, e2Callback.assigned);
+ assertEquals(2333, e3Callback.assigned);
+ assertEquals(4666, e4Callback.assigned);
+ assertEquals(1000, e5Callback.assigned);
+ }
+
private static class MemoryUpdateCallbackForTest extends MemoryUpdateCallback {
long assigned = -1000;