You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:42 UTC
[34/50] [abbrv] tez git commit: TEZ-2441. Add tests for
TezTaskRunner2. (sseth)
TEZ-2441. Add tests for TezTaskRunner2. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/acac8dc0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/acac8dc0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/acac8dc0
Branch: refs/heads/TEZ-2003
Commit: acac8dc034c93529e3526c7c2223b2a4bfd08450
Parents: bea4a58
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 29 18:25:18 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 18:23:02 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/runtime/task/TezChild.java | 5 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 2 +-
.../apache/tez/runtime/task/TezTaskRunner2.java | 42 +-
.../runtime/task/TaskExecutionTestHelpers.java | 451 +++++++++++++
.../runtime/task/TestContainerExecution.java | 59 ++
.../tez/runtime/task/TestTaskExecution.java | 400 +-----------
.../tez/runtime/task/TestTaskExecution2.java | 638 +++++++++++++++++++
.../src/test/resources/log4j.properties | 19 +
9 files changed, 1213 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index b88044b..9d72d92 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -39,5 +39,6 @@ ALL CHANGES:
TEZ-2651. Pluggable services should not extend AbstractService.
TEZ-2652. Cleanup the way services are specified for an AM and vertices.
TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
+ TEZ-2441. Add tests for TezTaskRunner2.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 353fe23..b64ec37 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezCommonUtils;
@@ -68,7 +67,6 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.slf4j.Logger;
@@ -256,6 +254,7 @@ public class TezChild {
boolean shouldDie;
try {
TaskRunner2Result result = taskRunner.run();
+ LOG.info("TaskRunner2Result: {}", result);
shouldDie = result.isContainerShutdownRequested();
if (shouldDie) {
LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
@@ -377,8 +376,6 @@ public class TezChild {
}
if (ownUmbilical) {
RPC.stopProxy(umbilical);
- // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
-// LogManager.shutdown();
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index a82d87b..aebf6a9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -250,7 +250,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
cause = ((UndeclaredThrowableException) cause).getCause();
}
maybeRegisterFirstException(cause);
- LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+ LOG.info("Encountered an error while executing task: " + task.getTaskAttemptID(),
cause);
try {
sendFailure(cause, "Failure while running task");
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index a5fabb5..1a8828d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
public class TezTaskRunner2 {
+ // Behaviour changes as compared to TezTaskRunner
+ // - Exception not thrown. Instead returned in the result.
+ // - The actual exception is part of the result, instead of requiring a getCause().
private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
@@ -156,19 +160,7 @@ public class TezTaskRunner2 {
}
}
}
- if (executionResult != null) {
- synchronized (this) {
- if (isRunningState()) {
- if (executionResult.error != null) {
- trySettingEndReason(EndReason.TASK_ERROR);
- registerFirstException(executionResult.error, null);
- } else {
- trySettingEndReason(EndReason.SUCCESS);
- taskComplete.set(true);
- }
- }
- }
- }
+ processCallableResult(executionResult);
switch (firstEndReason) {
case SUCCESS:
@@ -249,6 +241,26 @@ public class TezTaskRunner2 {
}
}
+ // It's possible for the task to actually complete, and an alternate signal such as killTask/killContainer
+ // come in before the future has been processed by this thread. That condition is not handled - and
+ // the result of the execution will be determind by the thread order.
+ @VisibleForTesting
+ void processCallableResult(TaskRunner2CallableResult executionResult) {
+ if (executionResult != null) {
+ synchronized (this) {
+ if (isRunningState()) {
+ if (executionResult.error != null) {
+ trySettingEndReason(EndReason.TASK_ERROR);
+ registerFirstException(executionResult.error, null);
+ } else {
+ trySettingEndReason(EndReason.SUCCESS);
+ taskComplete.set(true);
+ }
+ }
+ }
+ }
+ }
+
/**
* Attempt to kill the running task, if it hasn't already completed for some other reason.
* @return true if the task kill was honored, false otherwise
@@ -438,12 +450,12 @@ public class TezTaskRunner2 {
private String getTaskDiagnosticsString(Throwable t, String message) {
String diagnostics;
if (t != null && message != null) {
- diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+ diagnostics = "Failure while running task: " + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+ message;
} else if (t == null && message == null) {
diagnostics = "Unknown error";
} else {
- diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+ diagnostics = t != null ? "Failure while running task: " + ExceptionUtils.getStackTrace(t)
: " errorMessage=" + message;
}
return diagnostics;
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
new file mode 100644
index 0000000..fc42da3
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskExecutionTestHelpers {
+
+ public static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
+ // Uses static fields for signaling. Ensure only used by one test at a time.
+ public static class TestProcessor extends AbstractLogicalIOProcessor {
+
+ public static final byte[] CONF_EMPTY = new byte[] { 0 };
+ public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
+ public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
+
+ private static final ReentrantLock processorLock = new ReentrantLock();
+ private static final Condition processorCondition = processorLock.newCondition();
+ private static final Condition loopCondition = processorLock.newCondition();
+ private static final Condition completionCondition = processorLock.newCondition();
+ private static final Condition runningCondition = processorLock.newCondition();
+ private static volatile boolean completed = false;
+ private static volatile boolean running = false;
+ private static volatile boolean looping = false;
+ private static volatile boolean signalled = false;
+
+ private static boolean receivedInterrupt = false;
+ private static volatile boolean wasAborted = false;
+
+ private boolean throwIOException = false;
+ private boolean throwTezException = false;
+ private boolean signalFatalAndThrow = false;
+ private boolean signalFatalAndLoop = false;
+ private boolean signalFatalAndComplete = false;
+
+ public TestProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ parseConf(getContext().getUserPayload().deepCopyAsArray());
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ private void parseConf(byte[] bytes) {
+ byte b = bytes[0];
+ throwIOException = (b & 1) > 0;
+ throwTezException = (b & 2) > 0;
+ signalFatalAndThrow = (b & 4) > 0;
+ signalFatalAndLoop = (b & 8) > 0;
+ signalFatalAndComplete = (b & 16) > 0;
+ }
+
+ public static void reset() {
+ signalled = false;
+ receivedInterrupt = false;
+ completed = false;
+ running = false;
+ wasAborted = false;
+ }
+
+ public static void signal() {
+ LOG.info("Signalled");
+ processorLock.lock();
+ try {
+ signalled = true;
+ processorCondition.signal();
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static void awaitStart() throws InterruptedException {
+ LOG.info("Awaiting Process run");
+ processorLock.lock();
+ try {
+ if (running) {
+ return;
+ }
+ runningCondition.await();
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static void awaitLoop() throws InterruptedException {
+ LOG.info("Awaiting loop after signalling error");
+ processorLock.lock();
+ try {
+ if (looping) {
+ return;
+ }
+ loopCondition.await();
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static void awaitCompletion() throws InterruptedException {
+ LOG.info("Await completion");
+ processorLock.lock();
+ try {
+ if (completed) {
+ return;
+ } else {
+ completionCondition.await();
+ }
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static boolean wasInterrupted() {
+ processorLock.lock();
+ try {
+ return receivedInterrupt;
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static boolean wasAborted() {
+ processorLock.lock();
+ try {
+ return wasAborted;
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ @Override
+ public void abort() {
+ wasAborted = true;
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+ Exception {
+ processorLock.lock();
+ running = true;
+ runningCondition.signal();
+ try {
+ try {
+ LOG.info("Signal is: " + signalled);
+ if (!signalled) {
+ LOG.info("Waiting for processor signal");
+ processorCondition.await();
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ LOG.info("Received processor signal");
+ if (throwIOException) {
+ throw createProcessorIOException();
+ } else if (throwTezException) {
+ throw createProcessorTezException();
+ } else if (signalFatalAndThrow) {
+ IOException io = new IOException("FATALERROR");
+ getContext().fatalError(io, "FATALERROR");
+ throw io;
+ } else if (signalFatalAndComplete) {
+ IOException io = new IOException("FATALERROR");
+ getContext().fatalError(io, "FATALERROR");
+ return;
+ } else if (signalFatalAndLoop) {
+ IOException io = createProcessorIOException();
+ getContext().fatalError(io, "FATALERROR");
+ LOG.info("looping");
+ looping = true;
+ loopCondition.signal();
+ LOG.info("Waiting for Processor signal again");
+ processorCondition.await();
+ LOG.info("Received second processor signal");
+ }
+ } catch (InterruptedException e) {
+ receivedInterrupt = true;
+ }
+ } finally {
+ completed = true;
+ completionCondition.signal();
+ processorLock.unlock();
+ }
+ }
+ }
+
+ public static TezException createProcessorTezException() {
+ return new TezException("TezException");
+ }
+
+ public static IOException createProcessorIOException() {
+ return new IOException("IOException");
+ }
+
+ public static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class);
+
+ private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
+
+ private final ReentrantLock umbilicalLock = new ReentrantLock();
+ private final Condition eventCondition = umbilicalLock.newCondition();
+ private boolean pendingEvent = false;
+ private boolean eventEnacted = false;
+
+ volatile int getTaskInvocations = 0;
+
+ private boolean shouldThrowException = false;
+ private boolean shouldSendDieSignal = false;
+
+ public void signalThrowException() {
+ umbilicalLock.lock();
+ try {
+ shouldThrowException = true;
+ pendingEvent = true;
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void signalSendShouldDie() {
+ umbilicalLock.lock();
+ try {
+ shouldSendDieSignal = true;
+ pendingEvent = true;
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void awaitRegisteredEvent() throws InterruptedException {
+ umbilicalLock.lock();
+ try {
+ if (eventEnacted) {
+ return;
+ }
+ LOG.info("Awaiting event");
+ eventCondition.await();
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void resetTrackedEvents() {
+ umbilicalLock.lock();
+ try {
+ requestEvents.clear();
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyNoCompletionEvents() {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+ fail("Found a TaskAttemptFailedEvent when not expected");
+ }
+ if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+ fail("Found a TaskAttemptCompletedvent when not expected");
+ }
+ }
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyTaskFailedEvent(String diagnostics) {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+ TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent();
+ if (failedEvent.getDiagnostics().startsWith(diagnostics)) {
+ return;
+ } else {
+ fail("Diagnostic message does not match expected message. Found [" +
+ failedEvent.getDiagnostics() + "], Expected: [" + diagnostics + "]");
+ }
+ }
+ }
+ fail("No TaskAttemptFailedEvents sent over umbilical");
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyTaskFailedEvent(String diagStart, String diagContains) {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+ TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent();
+ if (failedEvent.getDiagnostics().startsWith(diagStart)) {
+ if (diagContains != null) {
+ if (failedEvent.getDiagnostics().contains(diagContains)) {
+ return;
+ } else {
+ fail("Diagnostic message does not contain expected message. Found [" +
+ failedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]");
+ }
+ }
+ } else {
+ fail("Diagnostic message does not start with expected message. Found [" +
+ failedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]");
+ }
+ }
+ }
+ fail("No TaskAttemptFailedEvents sent over umbilical");
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyTaskSuccessEvent() {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+ return;
+ }
+ }
+ fail("No TaskAttemptFailedEvents sent over umbilical");
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext) throws IOException {
+ // Return shouldDie = true
+ getTaskInvocations++;
+ return new ContainerTask(null, true, null, null, false);
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+ TezException {
+ umbilicalLock.lock();
+ if (request.getEvents() != null) {
+ requestEvents.addAll(request.getEvents());
+ }
+ try {
+ if (shouldThrowException) {
+ LOG.info("TestUmbilical throwing Exception");
+ throw new IOException(HEARTBEAT_EXCEPTION_STRING);
+ }
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
+ response.setLastRequestId(request.getRequestId());
+ if (shouldSendDieSignal) {
+ LOG.info("TestUmbilical returning shouldDie=true");
+ response.setShouldDie();
+ }
+ return response;
+ } finally {
+ if (pendingEvent) {
+ eventEnacted = true;
+ LOG.info("Signalling Event");
+ eventCondition.signal();
+ }
+ umbilicalLock.unlock();
+ }
+ }
+ }
+
+ public static ContainerId createContainerId(ApplicationId appId) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+ return containerId;
+ }
+
+ public static TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
+ TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
+ createContainerId(appId).toString());
+ return taskReporter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
new file mode 100644
index 0000000..c1616af
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.junit.Test;
+
+public class TestContainerExecution {
+
+ @Test(timeout = 5000)
+ public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ ContainerContext containerContext = new ContainerContext(containerId.toString());
+
+ ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
+ ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+
+ getTaskFuture.get();
+ assertEquals(1, umbilical.getTaskInvocations);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 1bcb337..a99416a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -18,8 +18,8 @@
package org.apache.tez.runtime.task;
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -28,30 +28,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -60,21 +48,13 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TezTaskUmbilicalForTest;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -82,7 +62,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -91,7 +70,7 @@ public class TestTaskExecution {
private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution.class);
- private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
private static final Configuration defaultConf = new Configuration();
private static final FileSystem localFs;
@@ -137,7 +116,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
boolean result = taskRunnerFuture.get();
@@ -164,7 +143,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
boolean result = taskRunnerFuture.get();
@@ -176,7 +155,7 @@ public class TestTaskExecution {
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
- taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
result = taskRunnerFuture.get();
@@ -188,7 +167,7 @@ public class TestTaskExecution {
}
}
- // test tasked failed due to exception in Processor
+ // test task failed due to exception in Processor
@Test(timeout = 5000)
public void testFailedTask() throws IOException, InterruptedException, TezException {
@@ -203,7 +182,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_TEZ_EXCEPTION);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
@@ -238,7 +217,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
"NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
try {
taskRunnerFuture.get();
} catch (ExecutionException e) {
@@ -268,7 +247,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalThrowException();
@@ -280,7 +259,7 @@ public class TestTaskExecution {
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof IOException);
- assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
+ assertTrue(cause.getMessage().contains(TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING));
}
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
@@ -307,7 +286,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalSendShouldDie();
@@ -329,38 +308,14 @@ public class TestTaskExecution {
}
}
- @Test(timeout = 5000)
- public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- ContainerContext containerContext = new ContainerContext(containerId.toString());
-
- ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
- ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
-
- getTaskFuture.get();
- assertEquals(1, umbilical.getTaskInvocations);
-
- } finally {
- executor.shutdownNow();
- }
- }
-
// Potential new tests
// Different states - initialization failure, close failure
// getTask states
- private static class TaskRunnerCallable implements Callable<Boolean> {
+ private static class TaskRunnerCallable1ForTest implements Callable<Boolean> {
private final TezTaskRunner taskRunner;
- public TaskRunnerCallable(TezTaskRunner taskRunner) {
+ public TaskRunnerCallable1ForTest(TezTaskRunner taskRunner) {
this.taskRunner = taskRunner;
}
@@ -370,328 +325,9 @@ public class TestTaskExecution {
}
}
- // Uses static fields for signaling. Ensure only used by one test at a time.
- public static class TestProcessor extends AbstractLogicalIOProcessor {
-
- public static final byte[] CONF_EMPTY = new byte[] { 0 };
- public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
- public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
-
- private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
-
- private static final ReentrantLock processorLock = new ReentrantLock();
- private static final Condition processorCondition = processorLock.newCondition();
- private static final Condition completionCondition = processorLock.newCondition();
- private static final Condition runningCondition = processorLock.newCondition();
- private static boolean completed = false;
- private static boolean running = false;
- private static boolean signalled = false;
-
- public static boolean receivedInterrupt = false;
-
- private boolean throwIOException = false;
- private boolean throwTezException = false;
- private boolean signalFatalAndThrow = false;
- private boolean signalFatalAndLoop = false;
- private boolean signalFatalAndComplete = false;
-
- public TestProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void initialize() throws Exception {
- parseConf(getContext().getUserPayload().deepCopyAsArray());
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
-
- }
-
- @Override
- public void close() throws Exception {
-
- }
- private void parseConf(byte[] bytes) {
- byte b = bytes[0];
- throwIOException = (b & 1) > 1;
- throwTezException = (b & 2) > 1;
- signalFatalAndThrow = (b & 4) > 1;
- signalFatalAndLoop = (b & 8) > 1;
- signalFatalAndComplete = (b & 16) > 1;
- }
- public static void reset() {
- signalled = false;
- receivedInterrupt = false;
- completed = false;
- running = false;
- }
- public static void signal() {
- LOG.info("Signalled");
- processorLock.lock();
- try {
- signalled = true;
- processorCondition.signal();
- } finally {
- processorLock.unlock();
- }
- }
-
- public static void awaitStart() throws InterruptedException {
- LOG.info("Awaiting Process run");
- processorLock.lock();
- try {
- if (running) {
- return;
- }
- runningCondition.await();
- } finally {
- processorLock.unlock();
- }
- }
-
- public static void awaitCompletion() throws InterruptedException {
- LOG.info("Await completion");
- processorLock.lock();
- try {
- if (completed) {
- return;
- } else {
- completionCondition.await();
- }
- } finally {
- processorLock.unlock();
- }
- }
-
- public static boolean wasInterrupted() {
- processorLock.lock();
- try {
- return receivedInterrupt;
- } finally {
- processorLock.unlock();
- }
- }
-
- @Override
- public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
- Exception {
- processorLock.lock();
- running = true;
- runningCondition.signal();
- try {
- try {
- LOG.info("Signal is: " + signalled);
- if (!signalled) {
- LOG.info("Waiting for processor signal");
- processorCondition.await();
- }
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
- LOG.info("Received processor signal");
- if (throwIOException) {
- throw new IOException();
- } else if (throwTezException) {
- throw new TezException("TezException");
- } else if (signalFatalAndThrow) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- throw io;
- } else if (signalFatalAndComplete) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- return;
- } else if (signalFatalAndLoop) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- LOG.info("Waiting for Processor signal again");
- processorCondition.await();
- LOG.info("Received second processor signal");
- }
- } catch (InterruptedException e) {
- receivedInterrupt = true;
- }
- } finally {
- completed = true;
- completionCondition.signal();
- processorLock.unlock();
- }
- }
- }
-
- private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
-
- private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class);
-
- private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
-
- private final ReentrantLock umbilicalLock = new ReentrantLock();
- private final Condition eventCondition = umbilicalLock.newCondition();
- private boolean pendingEvent = false;
- private boolean eventEnacted = false;
-
- volatile int getTaskInvocations = 0;
-
- private boolean shouldThrowException = false;
- private boolean shouldSendDieSignal = false;
-
- public void signalThrowException() {
- umbilicalLock.lock();
- try {
- shouldThrowException = true;
- pendingEvent = true;
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void signalSendShouldDie() {
- umbilicalLock.lock();
- try {
- shouldSendDieSignal = true;
- pendingEvent = true;
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void awaitRegisteredEvent() throws InterruptedException {
- umbilicalLock.lock();
- try {
- if (eventEnacted) {
- return;
- }
- LOG.info("Awaiting event");
- eventCondition.await();
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void resetTrackedEvents() {
- umbilicalLock.lock();
- try {
- requestEvents.clear();
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyNoCompletionEvents() {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptFailedEvent) {
- fail("Found a TaskAttemptFailedEvent when not expected");
- }
- if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
- fail("Found a TaskAttemptCompletedvent when not expected");
- }
- }
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyTaskFailedEvent(String diagnostics) {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptFailedEvent) {
- TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
- if(failedEvent.getDiagnostics().startsWith(diagnostics)){
- return ;
- } else {
- fail("No detailed diagnostics message in TaskAttemptFailedEvent");
- }
- }
- }
- fail("No TaskAttemptFailedEvents sent over umbilical");
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyTaskSuccessEvent() {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
- return;
- }
- }
- fail("No TaskAttemptFailedEvents sent over umbilical");
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- return 0;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int clientMethodsHash) throws IOException {
- return null;
- }
-
- @Override
- public ContainerTask getTask(ContainerContext containerContext) throws IOException {
- // Return shouldDie = true
- getTaskInvocations++;
- return new ContainerTask(null, true, null, null, false);
- }
-
- @Override
- public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
- return true;
- }
-
- @Override
- public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
- TezException {
- umbilicalLock.lock();
- if (request.getEvents() != null) {
- requestEvents.addAll(request.getEvents());
- }
- try {
- if (shouldThrowException) {
- LOG.info("TestUmbilical throwing Exception");
- throw new IOException(HEARTBEAT_EXCEPTION_STRING);
- }
- TezHeartbeatResponse response = new TezHeartbeatResponse();
- response.setLastRequestId(request.getRequestId());
- if (shouldSendDieSignal) {
- LOG.info("TestUmbilical returning shouldDie=true");
- response.setShouldDie();
- }
- return response;
- } finally {
- if (pendingEvent) {
- eventEnacted = true;
- LOG.info("Signalling Event");
- eventCondition.signal();
- }
- umbilicalLock.unlock();
- }
- }
- }
-
- private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
- TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
- createContainerId(appId).toString());
- return taskReporter;
- }
private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
@@ -722,9 +358,5 @@ public class TestTaskExecution {
return taskRunner;
}
- private ContainerId createContainerId(ApplicationId appId) {
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
- return containerId;
- }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
new file mode 100644
index 0000000..12d9d3f
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorIOException;
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorTezException;
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskExecution2 {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution2.class);
+
+ private static final Configuration defaultConf = new Configuration();
+ private static final FileSystem localFs;
+ private static final Path workDir;
+
+ private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+ ScalingAllocator.class.getName());
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
+ TestTaskExecution.class.getSimpleName());
+ workDir = localFs.makeQualified(wd);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ public void reset() {
+ TestProcessor.reset();
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ taskExecutor.shutdownNow();
+ }
+
+ @Test(timeout = 5000)
+ public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
+ new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ assertFalse(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
+ new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ assertFalse(TestProcessor.wasAborted());
+ umbilical.resetTrackedEvents();
+
+ taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ assertFalse(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // test task failed due to exception in Processor
+ @Test(timeout = 5000)
+ public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ TestProcessor.signal();
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false);
+
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent(
+ "Failure while running task",
+ TezException.class.getName() + ": " + TezException.class.getSimpleName());
+ // Failure detected as a result of fall off from the run method. abort isn't required.
+ assertFalse(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+
+ // Test task failed due to Processor class not found
+ @Test(timeout = 5000)
+ public void testFailedTask2() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ "NotExitedProcessor", TestProcessor.CONF_EMPTY, false);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
+ new TezUncheckedException("Unchecked exception"), false);
+
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent("Failure while running task",
+ ":org.apache.tez.dag.api.TezUncheckedException: "
+ + "Unable to load class: NotExitedProcessor");
+ // Failure detected as a result of fall off from the run method. abort isn't required.
+ assertFalse(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // test task failed due to exception in Processor
+ @Test(timeout = 5000)
+ public void testFailedTaskIOException() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_THROW_IO_EXCEPTION);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ TestProcessor.signal();
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false);
+
+
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent(
+ "Failure while running task",
+ IOException.class.getName() + ": " + IOException.class.getSimpleName());
+ // Failure detected as a result of fall off from the run method. abort isn't required.
+ assertFalse(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testHeartbeatException() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ umbilical.signalThrowException();
+ umbilical.awaitRegisteredEvent();
+ // Not signaling an actual start to verify task interruption
+
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE,
+ new IOException("IOException"),
+ TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false);
+
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ // No completion events since umbilical communication already failed.
+ umbilical.verifyNoCompletionEvents();
+ assertTrue(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ umbilical.signalSendShouldDie();
+ umbilical.awaitRegisteredEvent();
+ // Not signaling an actual start to verify task interruption
+
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true);
+
+
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ // TODO Is this statement correct ?
+ // No completion events since shouldDie was requested by the AM, which should have killed the
+ // task.
+ umbilical.verifyNoCompletionEvents();
+ assertTrue(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testSignalFatalErrorAndLoop() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_SIGNAL_FATAL_AND_LOOP);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ TestProcessor.signal();
+
+ TestProcessor.awaitLoop();
+ // The fatal error should have caused an interrupt.
+
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false);
+
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent(
+ "Failure while running task",
+ IOException.class.getName() + ": " + IOException.class.getSimpleName());
+ // Signal fatal error should cause the processor to fail.
+ assertTrue(TestProcessor.wasAborted());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testTaskKilled() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+
+ taskRunner.killTask();
+
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false);
+
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ // Kill events are not sent over the umbilical at the moment.
+ umbilical.verifyNoCompletionEvents();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testKilledAfterComplete() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+ umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner2ForTest taskRunner =
+ createTaskRunnerForTest(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<TaskRunner2Result> taskRunnerFuture =
+ taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ TestProcessor.signal();
+ TestProcessor.awaitCompletion();
+
+ taskRunner.awaitCallableCompletion();
+
+ taskRunner.killTask();
+ TaskRunner2Result result = taskRunnerFuture.get();
+ verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+
+ assertFalse(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+
+ private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
+ EndReason expectedEndReason, Throwable expectedThrowable,
+ boolean wasShutdownRequested) {
+ verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, expectedThrowable, null,
+ wasShutdownRequested);
+ }
+
+ private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
+ EndReason expectedEndReason, Throwable expectedThrowable,
+ String expectedExceptionMessage,
+ boolean wasShutdownRequested) {
+ assertEquals(expectedEndReason, taskRunner2Result.getEndReason());
+ if (expectedThrowable == null) {
+ assertNull(taskRunner2Result.getError());
+ } else {
+ assertNotNull(taskRunner2Result.getError());
+ Throwable cause = taskRunner2Result.getError();
+ LOG.info(cause.getClass().getName());
+ assertTrue(cause.getClass().isAssignableFrom(expectedThrowable.getClass()));
+
+ if (expectedExceptionMessage != null) {
+ assertTrue(cause.getMessage().contains(expectedExceptionMessage));
+ }
+
+ }
+ assertEquals(wasShutdownRequested, taskRunner2Result.isContainerShutdownRequested());
+ }
+
+
+ private static class TaskRunnerCallable2ForTest implements Callable<TaskRunner2Result> {
+ private final TezTaskRunner2 taskRunner;
+
+ public TaskRunnerCallable2ForTest(TezTaskRunner2 taskRunner) {
+ this.taskRunner = taskRunner;
+ }
+
+ @Override
+ public TaskRunner2Result call() throws Exception {
+ return taskRunner.run();
+ }
+ }
+
+ private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter,
+ ListeningExecutorService executor, byte[] processorConf)
+ throws IOException {
+ return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
+ processorConf, false);
+ }
+
+ private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter,
+ ListeningExecutorService executor,
+ byte[] processorConf)
+ throws IOException {
+ return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.class.getName(),
+ processorConf, true);
+ }
+
+ private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter,
+ ListeningExecutorService executor, String processorClass,
+ byte[] processorConf, boolean testRunner) throws
+ IOException {
+ TezConfiguration tezConf = new TezConfiguration(defaultConf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Path testDir = new Path(workDir, UUID.randomUUID().toString());
+ String[] localDirs = new String[]{testDir.toString()};
+
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+ ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
+ TaskSpec taskSpec =
+ new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
+ new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+
+ TezTaskRunner2 taskRunner;
+ if (testRunner) {
+ taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1,
+ new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
+ HashMultimap.<String, String>create(), taskReporter,
+ executor, null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory());
+ } else {
+ taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
+ new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
+ HashMultimap.<String, String>create(), taskReporter,
+ executor, null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory());
+ }
+
+ return taskRunner;
+ }
+
+ public static class TezTaskRunner2ForTest extends TezTaskRunner2 {
+
+ private final ReentrantLock testLock = new ReentrantLock();
+ private final Condition callableCompletionCondition = testLock.newCondition();
+
+ private final AtomicBoolean isCallableComplete = new AtomicBoolean(false);
+
+ public TezTaskRunner2ForTest(Configuration tezConf, UserGroupInformation ugi,
+ String[] localDirs,
+ TaskSpec taskSpec, int appAttemptNumber,
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> serviceProviderEnvMap,
+ Multimap<String, String> startedInputsMap,
+ TaskReporterInterface taskReporter,
+ ListeningExecutorService executor,
+ ObjectRegistry objectRegistry,
+ String pid,
+ ExecutionContext executionContext,
+ long memAvailable) throws IOException {
+ super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
+ serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
+ executionContext, memAvailable);
+ }
+
+
+ @Override
+ @VisibleForTesting
+ void processCallableResult(TaskRunner2Callable.TaskRunner2CallableResult executionResult) {
+ testLock.lock();
+ try {
+ super.processCallableResult(executionResult);
+ isCallableComplete.set(true);
+ callableCompletionCondition.signal();
+ } finally {
+ testLock.unlock();
+ }
+ }
+
+ void awaitCallableCompletion() throws InterruptedException {
+ testLock.lock();
+ try {
+ while (!isCallableComplete.get()) {
+ callableCompletionCondition.await();
+ }
+ } finally {
+ testLock.unlock();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/acac8dc0/tez-runtime-internals/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/resources/log4j.properties b/tez-runtime-internals/src/test/resources/log4j.properties
new file mode 100644
index 0000000..531b68b
--- /dev/null
+++ b/tez-runtime-internals/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n