You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/23 01:24:30 UTC
git commit: TEZ-844. Processors should have a mechanism to know when
an Input is ready for consumption. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 1752d553f -> e17e750c9
TEZ-844. Processors should have a mechanism to know when an Input is
ready for consumption. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e17e750c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e17e750c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e17e750c
Branch: refs/heads/master
Commit: e17e750c9527b801969e36a54ba445ab6fef737d
Parents: 1752d55
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Feb 22 16:24:11 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Feb 22 16:24:11 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/runtime/api/Input.java | 12 +-
.../tez/runtime/api/InputReadyCallback.java | 32 ++
.../tez/runtime/api/MergedLogicalInput.java | 30 +-
.../java/org/apache/tez/runtime/api/Output.java | 4 +
.../apache/tez/runtime/api/TezInputContext.java | 9 +
.../tez/runtime/api/TezProcessorContext.java | 32 ++
.../org/apache/tez/mapreduce/input/MRInput.java | 4 +-
.../processor/reduce/ReduceProcessor.java | 25 +-
.../apache/tez/runtime/InputReadyTracker.java | 161 +++++++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 23 +-
.../runtime/api/impl/TezInputContextImpl.java | 13 +-
.../api/impl/TezProcessorContextImpl.java | 26 +-
.../tez/runtime/TestInputReadyTracker.java | 325 +++++++++++++++++++
.../TestLogicalIOProcessorRuntimeTask.java | 1 +
.../input/BroadcastShuffleManager.java | 6 +-
.../library/common/shuffle/impl/Shuffle.java | 2 +
.../input/ConcatenatedMergedKeyValueInput.java | 7 +-
.../input/ConcatenatedMergedKeyValuesInput.java | 7 +-
.../runtime/library/input/LocalMergedInput.java | 4 +-
.../library/input/ShuffledMergedInput.java | 7 +-
.../library/input/ShuffledUnorderedKVInput.java | 4 +-
.../java/org/apache/tez/test/TestInput.java | 2 +-
22 files changed, 693 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index 27f66d0..f397114 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -23,10 +23,18 @@ import java.util.List;
/**
* Represents an input through which a TezProcessor receives data on an edge.
* </p>
- *
+ *
* <code>Input</code> classes must have a 0 argument public constructor for Tez
* to construct the <code>Input</code>. Tez will take care of initializing and
* closing the Input after a {@link Processor} completes. </p>
+ *
+ * During initialization, Inputs must specify an initial memory requirement via
+ * {@link TezInputContext}.requestInitialMemory
+ *
+ * Inputs must also inform the framework once they are ready to be consumed.
+ * This typically means that the Processor will not block when reading from the
+ * corresponding Input. This is done via {@link TezInputContext}.inputIsReady.
+ * Inputs choose the policy on when they are ready.
*/
public interface Input {
@@ -59,7 +67,7 @@ public interface Input {
* @throws Exception
*/
public void start() throws Exception;
-
+
/**
* Gets an instance of the {@link Reader} for this <code>Output</code>
*
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
new file mode 100644
index 0000000..0514651
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Used temporarily until MergedInputs have access to a context. Remove after
+ * TEZ-866
+ */
+@Private
+public interface InputReadyCallback {
+
+ public void setInputReady(Input input);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 1e82aca..2513589 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.api;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,14 +30,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class MergedLogicalInput implements LogicalInput {
+ // TODO Remove with TEZ-866
+ private volatile InputReadyCallback inputReadyCallback;
+ private AtomicBoolean notifiedInputReady = new AtomicBoolean(false);
private List<Input> inputs;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
+
public final void initialize(List<Input> inputs) {
- this.inputs = inputs;
+ this.inputs = Collections.unmodifiableList(inputs);
}
- protected List<Input> getInputs() {
+ public final List<Input> getInputs() {
return inputs;
}
@@ -69,4 +73,24 @@ public abstract class MergedLogicalInput implements LogicalInput {
throw new UnsupportedOperationException();
}
+ // TODO Remove with TEZ-866
+ public void setInputReadyCallback(InputReadyCallback callback) {
+ this.inputReadyCallback = callback;
+ }
+
+ /**
+ * Used by the actual MergedInput to notify that it's ready for consumption.
+ * TBD eventually via the context.
+ */
+ protected final void informInputReady() {
+ // TODO Fix with TEZ-866
+ if (!notifiedInputReady.getAndSet(true)) {
+ inputReadyCallback.setInputReady(this);
+ }
+ }
+
+ /**
+ * Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready.
+ */
+ public abstract void setConstituentInputIsReady(Input input);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index e986c34..76af6f6 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -27,6 +27,10 @@ import java.util.List;
* <code>Output</code> implementations must have a 0 argument public constructor
* for Tez to construct the <code>Output</code>. Tez will take care of
* initializing and closing the Input after a {@link Processor} completes. </p>
+ *
+ * During initialization, Outputs must specify an initial memory requirement via
+ * {@link TezOutputContext}.requestInitialMemory
+ *
*/
public interface Output {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
index 0e37030..4c9b6c8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
@@ -35,4 +35,13 @@ public interface TezInputContext extends TezTaskContext {
* @return index
*/
public int getInputIndex();
+
+ /**
+ * Inform the framework that the specific Input is ready for consumption. This
+ * method will typically be invoked as a result of an
+ * Input.inputReadyNotificationRequired invocation.
+ *
+ * This method can be invoked multiple times.
+ */
+ public void inputIsReady();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
index 001461b..913e6df 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.api;
import java.io.IOException;
+import java.util.Collection;
/**
* Context handle for the Processor to initialize itself.
@@ -38,4 +39,35 @@ public interface TezProcessorContext extends TezTaskContext {
*/
public boolean canCommit() throws IOException;
+ /**
+ * Blocking call which returns when any of the specified Inputs is ready for
+ * consumption.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * If multiple Inputs are ready, any one of them may be returned by this
+ * method - including an Input which may have been returned in a previous
+ * call. If invoking this method multiple times, it's recommended to remove
+ * previously completed Inputs from the invocation list.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @return the Input which is ready for consumption
+ * @throws InterruptedException
+ */
+ public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException;
+
+ /**
+ * Blocking call which returns only after all of the specified Inputs are
+ * ready for consumption.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @throws InterruptedException
+ */
+ public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index edd8e92..706f4fe 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -121,6 +121,7 @@ public class MRInput implements LogicalInput {
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
this.inputContext.requestInitialMemory(0l, null); //mandatory call
+ this.inputContext.inputIsReady();
MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
@@ -576,5 +577,6 @@ public class MRInput implements LogicalInput {
return value;
}
}
- };
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index f98e34f..41dff33 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.processor.reduce;
import java.io.IOException;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,12 +42,13 @@ import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
@@ -54,9 +56,7 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@SuppressWarnings({ "unchecked", "rawtypes" })
-public class ReduceProcessor
-extends MRTask
-implements LogicalIOProcessor {
+public class ReduceProcessor extends MRTask implements LogicalIOProcessor {
private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
@@ -94,13 +94,6 @@ implements LogicalIOProcessor {
Map<String, LogicalOutput> outputs) throws Exception {
LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
-
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
if (outputs.size() <= 0 || outputs.size() > 1) {
throw new IOException("Invalid number of outputs"
@@ -113,7 +106,15 @@ implements LogicalIOProcessor {
}
LogicalInput in = inputs.values().iterator().next();
+ in.start();
+
+ List<Input> pendingInputs = new LinkedList<Input>();
+ pendingInputs.add(in);
+ processorContext.waitForAllInputsReady(pendingInputs);
+ LOG.info("Input is ready for consumption. Starting Output");
+
LogicalOutput out = outputs.values().iterator().next();
+ out.start();
initTask(out);
@@ -364,4 +365,4 @@ implements LogicalIOProcessor {
return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
new file mode 100644
index 0000000..e242ec0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.InputReadyCallback;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class InputReadyTracker implements InputReadyCallback {
+
+ private final ConcurrentMap<Input, Boolean> readyInputs;
+
+ private ConcurrentMap<Input, List<MergedLogicalInput>> inputToGroupMap;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ public InputReadyTracker() {
+ readyInputs = Maps.newConcurrentMap();
+ }
+
+ // Called by the InputContext once it's ready.
+ public void setInputIsReady(Input input) {
+ lock.lock();
+ try {
+ Boolean old = readyInputs.putIfAbsent(input, true);
+ if (old == null) {
+ informGroupedInputs(input);
+ condition.signalAll();
+ } else {
+ // Ignore duplicate inputReady from the same Input
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private void informGroupedInputs(Input input) {
+ if (inputToGroupMap != null) {
+ List<MergedLogicalInput> mergedInputList = inputToGroupMap.get(input);
+ if (mergedInputList != null) {
+ for (MergedLogicalInput mergedInput : mergedInputList) {
+ mergedInput.setConstituentInputIsReady(input);
+ }
+ }
+ }
+ }
+
+ public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
+ Preconditions.checkArgument(inputs != null && inputs.size() > 0,
+ "At least one input should be specified");
+ InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, true);
+ return inputReadyMonitor.awaitCondition();
+ }
+
+ public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
+ Preconditions.checkArgument(inputs != null && inputs.size() > 0,
+ "At least one input should be specified");
+ InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, false);
+ inputReadyMonitor.awaitCondition();
+ }
+
+ private class InputReadyMonitor {
+
+ private final Set<Input> pendingInputs;
+ private final boolean selectOne;
+
+ public InputReadyMonitor(Collection<Input> inputs, boolean anyOne) {
+ pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<Input, Boolean>());
+ pendingInputs.addAll(inputs);
+ this.selectOne = anyOne;
+ }
+
+ public Input awaitCondition() throws InterruptedException {
+ lock.lock();
+ try {
+ while (pendingInputs.size() > 0) {
+ Iterator<Input> inputIter = pendingInputs.iterator();
+ while (inputIter.hasNext()) {
+ Input input = inputIter.next();
+ if (readyInputs.containsKey(input)) {
+ inputIter.remove();
+ // Return early in case of an ANY request
+ if (selectOne) {
+ return input;
+ }
+ }
+ }
+ if (pendingInputs.size() > 0) {
+ condition.await();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return null;
+ }
+ }
+
+ @Override // TODO Remove with TEZ-866
+ public void setInputReady(Input input) {
+ setInputIsReady(input);
+ }
+
+ public void setGroupedInputs(Collection<MergedLogicalInput> inputGroups) {
+ lock.lock();
+ try {
+ if (inputGroups != null) {
+ inputToGroupMap = Maps.newConcurrentMap();
+ for (MergedLogicalInput mergedInput : inputGroups) {
+ mergedInput.setInputReadyCallback(this);
+ for (Input dest : mergedInput.getInputs()) {
+ // Check already readt Inputs - may have become ready during initialize
+ if (readyInputs.containsKey(dest)) {
+ mergedInput.setConstituentInputIsReady(dest);
+ }
+ List<MergedLogicalInput> mergedList = inputToGroupMap.get(dest);
+ if (mergedList == null) {
+ mergedList = Lists.newArrayList();
+ inputToGroupMap.put(dest, mergedList);
+ }
+ mergedList.add(mergedInput);
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f5b414d..ae30b70 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -93,7 +93,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
private final List<GroupInputSpec> groupInputSpecs;
- private ConcurrentHashMap<String, LogicalInput> groupInputsMap;
+ private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
private final ProcessorDescriptor processorDescriptor;
private final LogicalIOProcessor processor;
@@ -116,6 +116,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private Thread eventRouterThread = null;
private final int appAttemptNumber;
+
+ private final InputReadyTracker inputReadyTracker;
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, TezUmbilical tezUmbilical,
@@ -154,6 +156,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.groupInputSpecs = taskSpec.getGroupInputs();
initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
this.startedInputsMap = startedInputsMap;
+ this.inputReadyTracker = new InputReadyTracker();
}
/**
@@ -201,6 +204,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.info("All initializers finished");
// group inputs depend on inputs beings initialized. So must be done after.
initializeGroupInputs();
+ // Register the groups so that appropriate calls can be made.
+ this.inputReadyTracker
+ .setGroupedInputs(groupInputsMap == null ? null : groupInputsMap.values());
// Grouped input start will be controlled by the start of the GroupedInput
// Construct the set of groupedInputs up front so that start is not invoked on them.
@@ -266,6 +272,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
}
+ LOG.info("AutoStartComplete");
@@ -348,7 +355,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
LOG.info("Initializing Input using InputSpec: " + inputSpec);
String edgeName = inputSpec.getSourceVertexName();
LogicalInput input = createInput(inputSpec);
- TezInputContext inputContext = createInputContext(inputSpec, inputIndex);
+ TezInputContext inputContext = createInputContext(input, inputSpec, inputIndex);
inputsMap.put(edgeName, input);
inputContextMap.put(edgeName, inputContext);
@@ -407,7 +414,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
.getPhysicalEdgeCount());
}
- LOG.info("Initializing Input with dest edge: " + edgeName);
+ LOG.info("Initializing Output with dest edge: " + edgeName);
List<Event> events = output.initialize(outputContext);
sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
outputContext.getTaskVertexName(),
@@ -426,8 +433,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
private void initializeGroupInputs() {
- if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
- groupInputsMap = new ConcurrentHashMap<String, LogicalInput>(groupInputSpecs.size());
+ if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
+ groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
for (GroupInputSpec groupInputSpec : groupInputSpecs) {
LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
MergedLogicalInput groupInput = (MergedLogicalInput) createInputFromDescriptor(
@@ -452,7 +459,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
+ processorDescriptor.getClassName());
}
- private TezInputContext createInputContext(InputSpec inputSpec, int inputIndex) {
+ private TezInputContext createInputContext(Input input, InputSpec inputSpec, int inputIndex) {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
@@ -461,7 +468,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
.getProcessorDescriptor().getUserPayload() : inputSpec
.getInputDescriptor().getUserPayload(), this,
serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
- inputSpec.getInputDescriptor());
+ inputSpec.getInputDescriptor(), input, inputReadyTracker);
return inputContext;
}
@@ -483,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
tezCounters, processorDescriptor.getUserPayload(), this,
serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
- processorDescriptor);
+ processorDescriptor, inputReadyTracker);
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index eeb7df0..d4b07d1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -41,6 +43,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
private final String sourceVertexName;
private final EventMetaData sourceInfo;
private final int inputIndex;
+ private final Input input;
+ private final InputReadyTracker inputReadyTracker;
@Private
public TezInputContextImpl(Configuration conf, int appAttemptNumber,
@@ -49,7 +53,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
TezCounters counters, int inputIndex, byte[] userPayload,
RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
- InputDescriptor inputDescriptor) {
+ InputDescriptor inputDescriptor, Input input, InputReadyTracker inputReadyTracker) {
super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, inputDescriptor);
@@ -59,6 +63,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
this.sourceInfo = new EventMetaData(
EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
taskAttemptID);
+ this.input = input;
+ this.inputReadyTracker = inputReadyTracker;
}
@Override
@@ -90,4 +96,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
}
+
+ @Override
+ public void inputIsReady() {
+ inputReadyTracker.setInputIsReady(input);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index c25802a..106251c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -18,27 +18,34 @@
package org.apache.tez.runtime.api.impl;
-import java.nio.ByteBuffer;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
-public class TezProcessorContextImpl extends TezTaskContextImpl
- implements TezProcessorContext {
+public class TezProcessorContextImpl extends TezTaskContextImpl implements TezProcessorContext {
+ private static final Log LOG = LogFactory.getLog(TezProcessorContextImpl.class);
+
private final byte[] userPayload;
private final EventMetaData sourceInfo;
+ private final InputReadyTracker inputReadyTracker;
public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
TezUmbilical tezUmbilical, String vertexName,
@@ -46,13 +53,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
byte[] userPayload, RuntimeTask runtimeTask,
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
- ProcessorDescriptor processorDescriptor) {
+ ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker) {
super(conf, appAttemptNumber, vertexName, taskAttemptID,
counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, processorDescriptor);
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
+ this.inputReadyTracker = inputReadyTracker;
}
@Override
@@ -84,4 +92,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
public boolean canCommit() throws IOException {
return tezUmbilical.canCommit(this.taskAttemptID);
}
+
+ @Override
+ public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
+ return inputReadyTracker.waitForAnyInputReady(inputs);
+ }
+
+ @Override
+ public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
+ inputReadyTracker.waitForAllInputsReady(inputs);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
new file mode 100644
index 0000000..6f5aa2f
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+public class TestInputReadyTracker {
+
+ private static final long SLEEP_TIME = 500l;
+
+ @Test(timeout = 5000)
+ public void testWithoutGrouping1() throws InterruptedException {
+ InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+ ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+ ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+
+ // Test for simple inputs
+ List<Input> requestList;
+ long startTime = 0l;
+ long readyTime = 0l;
+ requestList = new ArrayList<Input>();
+ requestList.add(input1);
+ requestList.add(input2);
+ Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ assertTrue(input1.isReady);
+ assertFalse(input2.isReady);
+ assertEquals(input1, readyInput);
+
+ startTime = System.currentTimeMillis();
+ setDelayedInputReady(input2);
+ inputReadyTracker.waitForAllInputsReady(requestList);
+ readyTime = System.currentTimeMillis();
+ // Should have moved into ready state - only happens when the setReady function is invoked.
+ // Ensure the method returned only after the specific Input was told it is ready
+ assertTrue(input2.isReady);
+ assertTrue(readyTime >= startTime + SLEEP_TIME);
+ assertTrue(input1.isReady);
+ }
+
+ @Test(timeout = 5000)
+ public void testWithoutGrouping2() throws InterruptedException {
+ InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+ ControlledReadyInputForTest input1 = new ControlledReadyInputForTest(inputReadyTracker);
+ ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+ ControlledReadyInputForTest input3 = new ControlledReadyInputForTest(inputReadyTracker);
+
+ // Test for simple inputs
+ List<Input> requestList;
+ long startTime = 0l;
+ long readyTime = 0l;
+
+ requestList = new ArrayList<Input>();
+ requestList.add(input1);
+ requestList.add(input2);
+ requestList.add(input3);
+
+ startTime = System.currentTimeMillis();
+ setDelayedInputReady(input2);
+ Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ assertEquals(input2, readyInput);
+ readyTime = System.currentTimeMillis();
+ // Should have moved into ready state - only happens when the setReady function is invoked.
+ // Ensure the method returned only after the specific Input was told it is ready
+ assertTrue(input2.isReady);
+ assertTrue(readyTime >= startTime + SLEEP_TIME);
+ assertFalse(input1.isReady);
+ assertFalse(input3.isReady);
+
+ requestList = new ArrayList<Input>();
+ requestList.add(input1);
+ requestList.add(input3);
+ startTime = System.currentTimeMillis();
+ setDelayedInputReady(input1);
+ readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ assertEquals(input1, readyInput);
+ readyTime = System.currentTimeMillis();
+ // Should have moved into ready state - only happens when the setReady function is invoked.
+ // Ensure the method returned only after the specific Input was told it is ready
+ assertTrue(input1.isReady);
+ assertTrue(readyTime >= startTime + SLEEP_TIME);
+ assertTrue(input2.isReady);
+ assertFalse(input3.isReady);
+
+ requestList = new ArrayList<Input>();
+ requestList.add(input3);
+ startTime = System.currentTimeMillis();
+ setDelayedInputReady(input3);
+ readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ assertEquals(input3, readyInput);
+ readyTime = System.currentTimeMillis();
+ // Should have moved into ready state - only happens when the setReady function is invoked.
+ // Ensure the method returned only after the specific Input was told it is ready
+ assertTrue(input3.isReady);
+ assertTrue(readyTime >= startTime + SLEEP_TIME);
+ assertTrue(input1.isReady);
+ assertTrue(input2.isReady);
+ }
+
+ @Test(timeout = 5000)
+ public void testGrouped() throws InterruptedException {
+ InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+ ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+ ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+
+ ImmediatelyReadyInputForTest input3 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+ ControlledReadyInputForTest input4 = new ControlledReadyInputForTest(inputReadyTracker);
+
+ AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest();
+ AllMergedInputForTest group2 = new AllMergedInputForTest();
+
+ List<Input> group1Inputs = new ArrayList<Input>();
+ group1Inputs.add(input1);
+ group1Inputs.add(input2);
+
+ List<Input> group2Inputs = new ArrayList<Input>();
+ group2Inputs.add(input3);
+ group2Inputs.add(input4);
+
+ group1.initialize(group1Inputs);
+ group2.initialize(group2Inputs);
+
+ // Register groups with tracker
+ List<MergedLogicalInput> groups = Lists.newArrayList(group1, group2);
+ inputReadyTracker.setGroupedInputs(groups);
+
+ // Test for simple inputs
+ List<Input> requestList;
+ long startTime = 0l;
+ long readyTime = 0l;
+ requestList = new ArrayList<Input>();
+ requestList.add(group1);
+ Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ assertTrue(group1.isReady);
+ assertTrue(input1.isReady);
+ assertFalse(input2.isReady);
+ assertEquals(group1, readyInput);
+
+
+ requestList = new ArrayList<Input>();
+ requestList.add(group2);
+
+
+ startTime = System.currentTimeMillis();
+ setDelayedInputReady(input4);
+ inputReadyTracker.waitForAllInputsReady(requestList);
+ readyTime = System.currentTimeMillis();
+ // Should have moved into ready state - only happens when the setReady function is invoked.
+ // Ensure the method returned only after the specific Input was told it is ready
+ assertTrue(group2.isReady);
+ assertTrue(input3.isReady);
+ assertTrue(input4.isReady);
+ assertTrue(readyTime >= startTime + SLEEP_TIME);
+
+ }
+
+ private long setDelayedInputReady(final ControlledReadyInputForTest input) {
+ long startTime = System.currentTimeMillis();
+ new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(SLEEP_TIME);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ input.setInputIsReady();
+ }
+ }.start();
+ return startTime;
+ }
+
+ private static class ImmediatelyReadyInputForTest implements LogicalInput {
+
+ private volatile boolean isReady = false;
+
+ ImmediatelyReadyInputForTest(InputReadyTracker inputReadyTracker) {
+ isReady = true;
+ inputReadyTracker.setInputIsReady(this);
+ }
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws Exception {
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ }
+ }
+
+ private static class ControlledReadyInputForTest implements LogicalInput {
+
+ private volatile boolean isReady = false;
+ private InputReadyTracker inputReadyTracker;
+
+ ControlledReadyInputForTest(InputReadyTracker inputReadyTracker) {
+ this.inputReadyTracker = inputReadyTracker;
+ }
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws Exception {
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ }
+
+ // Used by the test to control when this input will be ready
+ public void setInputIsReady() {
+ isReady = true;
+ inputReadyTracker.setInputIsReady(this);
+ }
+ }
+
+ private static class AnyOneMergedInputForTest extends MergedLogicalInput {
+
+ private volatile boolean isReady = false;
+
+ @Override
+ public Reader getReader() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ isReady = true;
+ informInputReady();
+ }
+ }
+
+ private static class AllMergedInputForTest extends MergedLogicalInput {
+
+ private volatile boolean isReady = false;
+ private Set<Input> readyInputs = Sets.newHashSet();
+
+ @Override
+ public Reader getReader() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ synchronized (this) {
+ readyInputs.add(input);
+ }
+ if (readyInputs.size() == getInputs().size()) {
+ isReady = true;
+ informInputReady();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index a1d2fa2..c2dd68b 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -175,6 +175,7 @@ public class TestLogicalIOProcessorRuntimeTask {
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
inputContext.requestInitialMemory(0, null);
+ inputContext.inputIsReady();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index bc17496..54f80e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -93,6 +93,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
private BlockingQueue<FetchedInput> completedInputs;
+ private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
private Set<InputIdentifier> completedInputSet;
private ConcurrentMap<String, InputHost> knownSrcHosts;
private BlockingQueue<InputHost> pendingHosts;
@@ -127,7 +128,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private volatile long initialMemoryAvailable = -1l;
-
+
// TODO NEWTEZ Add counters.
public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
@@ -541,6 +542,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
try {
completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
completedInputs.add(fetchedInput);
+ if (!inputReadyNotificationSent.getAndSet(true)) {
+ inputContext.inputIsReady();
+ }
numCompletedInputs.incrementAndGet();
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 1959533..8653b44 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -271,6 +271,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
throwable);
}
}
+
+ inputContext.inputIsReady();
return kvIter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 004701d..f142ee9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -69,4 +70,8 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
return new ConcatenatedMergedKeyValueReader();
}
-}
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ informInputReady();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 29cd490..8affa14 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -69,4 +70,8 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
return new ConcatenatedMergedKeyValuesReader();
}
-}
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ informInputReady();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index bbeae52..d298e8f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -37,6 +37,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
this.inputContext.requestInitialMemory(0l, null); // mandatory call.
+ this.inputContext.inputIsReady();
this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
if (numInputs == 0) {
@@ -60,5 +61,4 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
}
return Collections.emptyList();
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index d2d4e56..de707a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -84,9 +84,9 @@ public class ShuffledMergedInput implements LogicalInput {
}
this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
- this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
- this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
- inputContext.getWorkDirs());
+ this.inputValueCounter = inputContext.getCounters().findCounter(
+ TaskCounter.REDUCE_INPUT_RECORDS);
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
shuffle = new Shuffle(inputContext, this.conf, numInputs);
return Collections.emptyList();
@@ -239,5 +239,4 @@ public class ShuffledMergedInput implements LogicalInput {
ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 1241606..d216eeb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -66,8 +66,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
return null;
}
- this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
- numInputs);
+ this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
return Collections.emptyList();
}
@@ -143,4 +142,5 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
public void setNumPhysicalInputs(int numInputs) {
this.numInputs = numInputs;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index da4ea16..90dadb6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -223,6 +223,7 @@ public class TestInput implements LogicalInput {
public List<Event> initialize(TezInputContext inputContext) throws Exception {
this.inputContext = inputContext;
this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
+ this.inputContext.inputIsReady();
if (inputContext.getUserPayload() != null) {
String vName = inputContext.getTaskVertexName();
conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
@@ -324,5 +325,4 @@ public class TestInput implements LogicalInput {
this.inputValues[i] = -1;
}
}
-
}