You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/27 19:58:20 UTC
tez git commit: TEZ-3302. Add a version of
processorContext.waitForAllInputsReady and waitForAnyInputReady with a
timeout. Contributed by Tsuyoshi Ozawa.
Repository: tez
Updated Branches:
refs/heads/master 2c22e23a7 -> d2b9222fb
TEZ-3302. Add a version of processorContext.waitForAllInputsReady and
waitForAnyInputReady with a timeout. Contributed by Tsuyoshi Ozawa.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d2b9222f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d2b9222f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d2b9222f
Branch: refs/heads/master
Commit: d2b9222fb589ce85124b1e381ba199621e91b263
Parents: 2c22e23
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jun 27 12:57:47 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jun 27 12:57:47 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tez/runtime/api/ProcessorContext.java | 43 +++++++++++++++++++-
.../apache/tez/runtime/InputReadyTracker.java | 36 ++++++++++++++--
.../api/impl/TezProcessorContextImpl.java | 14 ++++++-
.../tez/runtime/TestInputReadyTracker.java | 16 +++++---
5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 640d957..26ff72c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
TEZ-3291. Optimize splits grouping when locality information is not available.
TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
@@ -66,6 +67,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
index 8b88289..acb2a57 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
@@ -63,7 +63,30 @@ public interface ProcessorContext extends TaskContext {
* @throws InterruptedException
*/
public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException;
-
+
+ /**
+ * Blocking call which returns when any of the specified Inputs is ready for
+ * consumption.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * If multiple Inputs are ready, any one of them may be returned by this
+ * method - including an Input which may have been returned in a previous
+ * call. If invoking this method multiple times, it's recommended to remove
+ * previously completed Inputs from the invocation list.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @param timeoutMillis
+ * timeout to return in milliseconds. If this value is negative,
+ * this function will wait forever until all inputs get ready
+ * or interrupted.
+ * @return the Input which is ready for consumption. return null when timeout occurs.
+ * @throws InterruptedException
+ */
+ public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException;
+
/**
* Blocking call which returns only after all of the specified Inputs are
* ready for consumption.
@@ -76,4 +99,22 @@ public interface ProcessorContext extends TaskContext {
* @throws InterruptedException
*/
public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException;
+
+ /**
+ * Blocking call which returns only after all of the specified Inputs are
+ * ready for consumption with timeout.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @param timeoutMillis
+ * timeout to return in milliseconds. If this value is negative,
+ * this function will wait forever until all inputs get ready
+ * or interrupted.
+ * @return Return true if all inputs are ready. Otherwise, return false.
+ * @throws InterruptedException
+ */
+ public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
index 93035ba..ba4fe1d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -77,17 +79,36 @@ public class InputReadyTracker {
}
public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
+ return waitForAnyInputReady(inputs, -1);
+ }
+
+ public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException {
Preconditions.checkArgument(inputs != null && inputs.size() > 0,
"At least one input should be specified");
InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, true);
- return inputReadyMonitor.awaitCondition();
+ try {
+ return inputReadyMonitor.awaitCondition(timeoutMillis);
+ } catch (TimeoutException e) {
+ return null;
+ }
}
public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
+ waitForAllInputsReady(inputs, -1);
+ }
+
+ public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException {
Preconditions.checkArgument(inputs != null && inputs.size() > 0,
"At least one input should be specified");
+ boolean succeeded = true;
InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, false);
- inputReadyMonitor.awaitCondition();
+
+ try {
+ inputReadyMonitor.awaitCondition(timeoutMillis);
+ } catch (TimeoutException e) {
+ succeeded = false;
+ }
+ return succeeded;
}
private class InputReadyMonitor {
@@ -101,7 +122,7 @@ public class InputReadyTracker {
this.selectOne = anyOne;
}
- public Input awaitCondition() throws InterruptedException {
+ public Input awaitCondition(long timeoutMillis) throws InterruptedException, TimeoutException {
lock.lock();
try {
while (pendingInputs.size() > 0) {
@@ -117,7 +138,14 @@ public class InputReadyTracker {
}
}
if (pendingInputs.size() > 0) {
- condition.await();
+ if (timeoutMillis >= 0) {
+ boolean succeeded = condition.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (!succeeded) {
+ throw new TimeoutException("pending Inputs timeout");
+ }
+ } else { // timeout < 0
+ condition.await();
+ }
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 607bbf1..d7c2d3e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -120,12 +120,22 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
@Override
public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
- return inputReadyTracker.waitForAnyInputReady(inputs);
+ return waitForAnyInputReady(inputs, -1);
+ }
+
+ @Override
+ public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException {
+ return inputReadyTracker.waitForAnyInputReady(inputs, timeoutMillis);
}
@Override
public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
- inputReadyTracker.waitForAllInputsReady(inputs);
+ waitForAllInputsReady(inputs, -1);
+ }
+
+ @Override
+ public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException {
+ return inputReadyTracker.waitForAllInputsReady(inputs, timeoutMillis);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index 29c5023..1846354 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -43,9 +44,9 @@ import com.google.common.collect.Sets;
public class TestInputReadyTracker {
- private static final long SLEEP_TIME = 500l;
+ private static final long SLEEP_TIME = 2000l;
- @Test(timeout = 5000)
+ @Test(timeout = 20000)
public void testWithoutGrouping1() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();
@@ -66,7 +67,8 @@ public class TestInputReadyTracker {
startTime = System.nanoTime();
setDelayedInputReady(input2);
- inputReadyTracker.waitForAllInputsReady(requestList);
+ assertFalse(inputReadyTracker.waitForAllInputsReady(requestList, 0));
+ assertTrue(inputReadyTracker.waitForAllInputsReady(requestList, -1));
readyTime = System.nanoTime();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
@@ -75,7 +77,7 @@ public class TestInputReadyTracker {
assertTrue(input1.isReady);
}
- @Test(timeout = 5000)
+ @Test(timeout = 20000)
public void testWithoutGrouping2() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();
@@ -124,7 +126,9 @@ public class TestInputReadyTracker {
requestList.add(input3);
startTime = System.nanoTime();
setDelayedInputReady(input3);
- readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+ readyInput = inputReadyTracker.waitForAnyInputReady(requestList, 0);
+ assertNull(readyInput);
+ readyInput = inputReadyTracker.waitForAnyInputReady(requestList, -1);
assertEquals(input3, readyInput);
readyTime = System.nanoTime();
// Should have moved into ready state - only happens when the setReady function is invoked.
@@ -135,7 +139,7 @@ public class TestInputReadyTracker {
assertTrue(input2.isReady);
}
- @Test(timeout = 5000)
+ @Test(timeout = 20000)
public void testGrouped() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();