You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/12 23:31:33 UTC
[1/3] tez git commit: TEZ-1933. Move TezChild and related classes
into tez-runtime-internals. (sseth)
Repository: tez
Updated Branches:
refs/heads/master fda4c0bdc -> e250983e5
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
new file mode 100644
index 0000000..6e655f9
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class TezTaskRunner implements TezUmbilical, ErrorReporter {
+
+ private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
+
+ private final Configuration tezConf;
+ private final LogicalIOProcessorRuntimeTask task;
+ private final UserGroupInformation ugi;
+
+ private final TaskReporter taskReporter;
+ private final ListeningExecutorService executor;
+ private volatile ListenableFuture<Void> taskFuture;
+ private volatile Thread waitingThread;
+ private volatile Throwable firstException;
+
+ // Effectively a duplicate check, since hadFatalError does the same thing.
+ private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
+ private final AtomicBoolean taskRunning;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+ TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+ TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
+ Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
+ Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
+ ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
+ ExecutionContext ExecutionContext)
+ throws IOException {
+ this.tezConf = tezConf;
+ this.ugi = ugi;
+ this.taskReporter = taskReporter;
+ this.executor = executor;
+ task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
+ serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
+ ExecutionContext);
+ taskReporter.registerTask(task, this);
+ taskRunning = new AtomicBoolean(true);
+ }
+
+ /**
+ * @return false if a shutdown message was received during task execution
+ * @throws TezException
+ * @throws IOException
+ */
+ public boolean run() throws InterruptedException, IOException, TezException {
+ waitingThread = Thread.currentThread();
+ TaskRunnerCallable callable = new TaskRunnerCallable();
+ Throwable failureCause = null;
+ taskFuture = executor.submit(callable);
+ try {
+ taskFuture.get();
+
+ // Task could signal a fatal error and return control, or a failure while registering success.
+ failureCause = firstException;
+
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for task to complete. Interrupting task");
+ taskFuture.cancel(true);
+ if (shutdownRequested.get()) {
+ LOG.info("Shutdown requested... returning");
+ return false;
+ }
+ if (firstException != null) {
+ failureCause = firstException;
+ } else {
+ // Interrupted for some other reason.
+ failureCause = e;
+ }
+ } catch (ExecutionException e) {
+ // Exception thrown by the run() method itself.
+ Throwable cause = e.getCause();
+ if (cause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ failureCause = cause;
+ } else if (cause instanceof Error) {
+ LOG.error("Exception of type Error.", cause);
+ sendFailure(cause, "Fatal Error cause TezChild exit.");
+ throw new TezException("Fatal Error cause TezChild exit.", cause);
+ } else {
+ failureCause = cause;
+ }
+ } finally {
+ // Clear the interrupted status of the blocking thread, in case it is set after the
+ // InterruptedException was invoked.
+ taskReporter.unregisterTask(task.getTaskAttemptID());
+ Thread.interrupted();
+ }
+
+ if (failureCause != null) {
+ if (failureCause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+ failureCause);
+ throw (FSError) failureCause;
+ } else if (failureCause instanceof Error) {
+ LOG.error("Exception of type Error.", failureCause);
+ sendFailure(failureCause, "Fatal error cause TezChild exit.");
+ throw new TezException("Fatal error cause TezChild exit.", failureCause);
+ } else {
+ if (failureCause instanceof IOException) {
+ throw (IOException) failureCause;
+ } else if (failureCause instanceof TezException) {
+ throw (TezException) failureCause;
+ } else if (failureCause instanceof InterruptedException) {
+ throw (InterruptedException) failureCause;
+ } else {
+ throw new TezException(failureCause);
+ }
+ }
+ }
+ if (shutdownRequested.get()) {
+ LOG.info("Shutdown requested... returning");
+ return false;
+ }
+ return true;
+ }
+
+ private class TaskRunnerCallable implements Callable<Void> {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+ task.initialize();
+ if (!Thread.currentThread().isInterrupted() && firstException == null) {
+ LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+ task.run();
+ LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+ task.close();
+ task.setFrameworkCounters();
+ }
+ LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
+ + ", fatalErrorOccurred=" + (firstException != null));
+ if (firstException == null) {
+ try {
+ taskReporter.taskSucceeded(task.getTaskAttemptID());
+ } catch (IOException e) {
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ maybeRegisterFirstException(e);
+ // Falling off, since the runner thread checks for the registered exception.
+ } catch (TezException e) {
+ LOG.warn("Heartbeat failure reported by AM", e);
+ maybeRegisterFirstException(e);
+ // Falling off, since the runner thread checks for the registered exception.
+ }
+ }
+ return null;
+ } catch (Throwable cause) {
+ if (cause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ maybeRegisterFirstException(cause);
+ LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+ cause);
+ try {
+ sendFailure(cause, "FS Error in Child JVM");
+ } catch (Exception ignored) {
+ // Ignored since another cause is already known
+ LOG.info(
+ "Ignoring the following exception since a previous exception is already registered",
+ ignored);
+ }
+ throw (FSError) cause;
+ } else if (cause instanceof Error) {
+ LOG.error("Exception of type Error.", cause);
+ sendFailure(cause, "Fatal Error cause TezChild exit.");
+ throw new TezException("Fatal Error cause TezChild exit.", cause);
+ } else {
+ if (cause instanceof UndeclaredThrowableException) {
+ cause = ((UndeclaredThrowableException) cause).getCause();
+ }
+ maybeRegisterFirstException(cause);
+ LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+ cause);
+ try {
+ sendFailure(cause, "Failure while running task");
+ } catch (Exception ignored) {
+ // Ignored since another cause is already known
+ LOG.info(
+ "Ignoring the following exception since a previous exception is already registered",
+ ignored);
+ }
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof TezException) {
+ throw (TezException) cause;
+ } else {
+ throw new TezException(cause);
+ }
+ }
+ } finally {
+ task.cleanup();
+ }
+ }
+ });
+ } finally {
+ taskRunning.set(false);
+ }
+ }
+ }
+
+ // should wait until all messages are sent to AM before TezChild shutdown
+ // if this method become async in future
+ private void sendFailure(Throwable t, String message) throws IOException, TezException {
+ if (!fatalErrorSent.getAndSet(true)) {
+ task.setFatalError(t, message);
+ task.setFrameworkCounters();
+ try {
+ taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
+ } catch (IOException e) {
+ // A failure reason already exists, Comm error just logged.
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ throw e;
+ } catch (TezException e) {
+ // A failure reason already exists, Comm error just logged.
+ LOG.warn("Heartbeat failure reported by AM", e);
+ throw e;
+ }
+ } else {
+ LOG.warn("Ignoring fatal error since another error has already been reported", t);
+ }
+ }
+
+ @Override
+ public void addEvents(Collection<TezEvent> events) {
+ if (taskRunning.get()) {
+ taskReporter.addEvents(task.getTaskAttemptID(), events);
+ }
+ }
+
+ @Override
+ public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
+ String message, EventMetaData sourceInfo) {
+ // This can be called before a task throws an exception or after it.
+ // If called before a task throws an exception
+ // - ensure a heartbeat is sent with the diagnostics, and sent only once.
+ // - interrupt the waiting thread, and make it throw the reported error.
+ // If called after a task throws an exception, the waiting task has already returned, no point
+ // interrupting it.
+ // This case can be effectively ignored (log), as long as the run() method ends up throwing the
+ // exception.
+ //
+ //
+ if (!fatalErrorSent.getAndSet(true)) {
+ maybeRegisterFirstException(t);
+ try {
+ taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+ } catch (IOException e) {
+ // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+ // occurred earlier.
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ } catch (TezException e) {
+ // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+ // occurred earlier.
+ LOG.warn("Heartbeat failure reported by AM", e);
+ } finally {
+ // Wake up the waiting thread so that it can return control
+ waitingThread.interrupt();
+ }
+ }
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+ if (taskRunning.get()) {
+ try {
+ return taskReporter.canCommit(taskAttemptID);
+ } catch (IOException e) {
+ LOG.warn("Communication failure while trying to commit", e);
+ maybeRegisterFirstException(e);
+ waitingThread.interrupt();
+ // Not informing the task since it will be interrupted.
+ // TODO: Should this be sent to the task as well, current Processors, etc do not handle
+ // interrupts very well.
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized void reportError(Throwable t) {
+ if (taskRunning.get()) {
+ LOG.error("TaskReporter reported error", t);
+ maybeRegisterFirstException(t);
+ waitingThread.interrupt();
+ // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+ // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+ // method does not throw an exception, in which case task success is registered with the AM.
+ // Leave this handling to the next getTask / actual task.
+ } else {
+ LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+ + " is already complete");
+ }
+ }
+
+ @Override
+ public void shutdownRequested() {
+ shutdownRequested.set(true);
+ waitingThread.interrupt();
+ }
+
+ private String getTaskDiagnosticsString(Throwable t, String message) {
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+ + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+ : " errorMessage=" + message;
+ }
+ return diagnostics;
+ }
+
+ private synchronized void maybeRegisterFirstException(Throwable t) {
+ if (firstException == null) {
+ firstException = t;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
new file mode 100644
index 0000000..4f94cfe
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -0,0 +1,729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+// Tests in this class cannot be run in parallel.
+public class TestTaskExecution {
+
+ private static final Logger LOG = Logger.getLogger(TestTaskExecution.class);
+
+ private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
+ private static final Configuration defaultConf = new Configuration();
+ private static final FileSystem localFs;
+ private static final Path workDir;
+
+ private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+ ScalingAllocator.class.getName());
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
+ TestTaskExecution.class.getSimpleName());
+ workDir = localFs.makeQualified(wd);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ public void reset() {
+ TestProcessor.reset();
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ taskExecutor.shutdownNow();
+ }
+
+ @Test(timeout = 5000)
+ public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ boolean result = taskRunnerFuture.get();
+ assertTrue(result);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ boolean result = taskRunnerFuture.get();
+ assertTrue(result);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ umbilical.resetTrackedEvents();
+
+ taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.signal();
+ result = taskRunnerFuture.get();
+ assertTrue(result);
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskSuccessEvent();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // test tasked failed due to exception in Processor
+ @Test(timeout = 5000)
+ public void testFailedTask() throws IOException, InterruptedException, TezException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ TestProcessor.signal();
+ try {
+ taskRunnerFuture.get();
+ fail("Expecting the task to fail");
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ LOG.info(cause.getClass().getName());
+ assertTrue(cause instanceof TezException);
+ }
+
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezException: TezException");
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // Test task failed due to Processor class not found
+ @Test(timeout = 5000)
+ public void testFailedTask2() throws IOException, InterruptedException, TezException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ try {
+ taskRunnerFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ LOG.info(cause.getClass().getName());
+ assertTrue(cause instanceof TezException);
+ }
+ assertNull(taskReporter.currentCallable);
+ umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezUncheckedException: "
+ + "Unable to load class: NotExitedProcessor");
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testHeartbeatException() throws IOException, InterruptedException, TezException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ umbilical.signalThrowException();
+ umbilical.awaitRegisteredEvent();
+ // Not signaling an actual start to verify task interruption
+ try {
+ taskRunnerFuture.get();
+ fail("Expecting the task to fail");
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue(cause instanceof IOException);
+ assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
+ }
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ // No completion events since umbilical communication already failed.
+ umbilical.verifyNoCompletionEvents();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
+ ExecutionException {
+
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+ TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+ TestProcessor.CONF_EMPTY);
+ // Setup the executor
+ Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+ // Signal the processor to go through
+ TestProcessor.awaitStart();
+ umbilical.signalSendShouldDie();
+ umbilical.awaitRegisteredEvent();
+ // Not signaling an actual start to verify task interruption
+
+ boolean result = taskRunnerFuture.get();
+ assertFalse(result);
+
+ TestProcessor.awaitCompletion();
+ assertTrue(TestProcessor.wasInterrupted());
+ assertNull(taskReporter.currentCallable);
+ // TODO Is this statement correct ?
+ // No completion events since shouldDie was requested by the AM, which should have killed the
+ // task.
+ umbilical.verifyNoCompletionEvents();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
+ ListeningExecutorService executor = null;
+ try {
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+
+ TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+ ContainerContext containerContext = new ContainerContext(containerId.toString());
+
+ ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
+ ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+
+ getTaskFuture.get();
+ assertEquals(1, umbilical.getTaskInvocations);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ // Potential new tests
+ // Different states - initialization failure, close failure
+ // getTask states
+
+ private static class TaskRunnerCallable implements Callable<Boolean> {
+ private final TezTaskRunner taskRunner;
+
+ public TaskRunnerCallable(TezTaskRunner taskRunner) {
+ this.taskRunner = taskRunner;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ return taskRunner.run();
+ }
+ }
+
+ // Uses static fields for signaling. Ensure only used by one test at a time.
+ public static class TestProcessor extends AbstractLogicalIOProcessor {
+
+ public static final byte[] CONF_EMPTY = new byte[] { 0 };
+ public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
+ public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
+ public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
+
+ private static final Logger LOG = Logger.getLogger(TestProcessor.class);
+
+ private static final ReentrantLock processorLock = new ReentrantLock();
+ private static final Condition processorCondition = processorLock.newCondition();
+ private static final Condition completionCondition = processorLock.newCondition();
+ private static final Condition runningCondition = processorLock.newCondition();
+ private static boolean completed = false;
+ private static boolean running = false;
+ private static boolean signalled = false;
+
+ public static boolean receivedInterrupt = false;
+
+ private boolean throwIOException = false;
+ private boolean throwTezException = false;
+ private boolean signalFatalAndThrow = false;
+ private boolean signalFatalAndLoop = false;
+ private boolean signalFatalAndComplete = false;
+
+ public TestProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ parseConf(getContext().getUserPayload().deepCopyAsArray());
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ private void parseConf(byte[] bytes) {
+ byte b = bytes[0];
+ throwIOException = (b & 1) > 1;
+ throwTezException = (b & 2) > 1;
+ signalFatalAndThrow = (b & 4) > 1;
+ signalFatalAndLoop = (b & 8) > 1;
+ signalFatalAndComplete = (b & 16) > 1;
+ }
+
+ public static void reset() {
+ signalled = false;
+ receivedInterrupt = false;
+ completed = false;
+ running = false;
+ }
+
+ public static void signal() {
+ LOG.info("Signalled");
+ processorLock.lock();
+ try {
+ signalled = true;
+ processorCondition.signal();
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static void awaitStart() throws InterruptedException {
+ LOG.info("Awaiting Process run");
+ processorLock.lock();
+ try {
+ if (running) {
+ return;
+ }
+ runningCondition.await();
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static void awaitCompletion() throws InterruptedException {
+ LOG.info("Await completion");
+ processorLock.lock();
+ try {
+ if (completed) {
+ return;
+ } else {
+ completionCondition.await();
+ }
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ public static boolean wasInterrupted() {
+ processorLock.lock();
+ try {
+ return receivedInterrupt;
+ } finally {
+ processorLock.unlock();
+ }
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+ Exception {
+ processorLock.lock();
+ running = true;
+ runningCondition.signal();
+ try {
+ try {
+ LOG.info("Signal is: " + signalled);
+ if (!signalled) {
+ LOG.info("Waiting for processor signal");
+ processorCondition.await();
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ LOG.info("Received processor signal");
+ if (throwIOException) {
+ throw new IOException();
+ } else if (throwTezException) {
+ throw new TezException("TezException");
+ } else if (signalFatalAndThrow) {
+ IOException io = new IOException("FATALERROR");
+ getContext().fatalError(io, "FATALERROR");
+ throw io;
+ } else if (signalFatalAndComplete) {
+ IOException io = new IOException("FATALERROR");
+ getContext().fatalError(io, "FATALERROR");
+ return;
+ } else if (signalFatalAndLoop) {
+ IOException io = new IOException("FATALERROR");
+ getContext().fatalError(io, "FATALERROR");
+ LOG.info("Waiting for Processor signal again");
+ processorCondition.await();
+ LOG.info("Received second processor signal");
+ }
+ } catch (InterruptedException e) {
+ receivedInterrupt = true;
+ }
+ } finally {
+ completed = true;
+ completionCondition.signal();
+ processorLock.unlock();
+ }
+ }
+ }
+
+ private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
+
+ private static final Logger LOG = Logger.getLogger(TezTaskUmbilicalForTest.class);
+
+ private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
+
+ private final ReentrantLock umbilicalLock = new ReentrantLock();
+ private final Condition eventCondition = umbilicalLock.newCondition();
+ private boolean pendingEvent = false;
+ private boolean eventEnacted = false;
+
+ volatile int getTaskInvocations = 0;
+
+ private boolean shouldThrowException = false;
+ private boolean shouldSendDieSignal = false;
+
+ public void signalThrowException() {
+ umbilicalLock.lock();
+ try {
+ shouldThrowException = true;
+ pendingEvent = true;
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void signalSendShouldDie() {
+ umbilicalLock.lock();
+ try {
+ shouldSendDieSignal = true;
+ pendingEvent = true;
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void awaitRegisteredEvent() throws InterruptedException {
+ umbilicalLock.lock();
+ try {
+ if (eventEnacted) {
+ return;
+ }
+ LOG.info("Awaiting event");
+ eventCondition.await();
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void resetTrackedEvents() {
+ umbilicalLock.lock();
+ try {
+ requestEvents.clear();
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyNoCompletionEvents() {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+ fail("Found a TaskAttemptFailedEvent when not expected");
+ }
+ if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+ fail("Found a TaskAttemptCompletedvent when not expected");
+ }
+ }
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyTaskFailedEvent(String diagnostics) {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+ TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
+ if(failedEvent.getDiagnostics().startsWith(diagnostics)){
+ return ;
+ } else {
+ fail("No detailed diagnostics message in TaskAttemptFailedEvent");
+ }
+ }
+ }
+ fail("No TaskAttemptFailedEvents sent over umbilical");
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ public void verifyTaskSuccessEvent() {
+ umbilicalLock.lock();
+ try {
+ for (TezEvent event : requestEvents) {
+ if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+ return;
+ }
+ }
+ fail("No TaskAttemptFailedEvents sent over umbilical");
+ } finally {
+ umbilicalLock.unlock();
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext) throws IOException {
+ // Return shouldDie = true
+ getTaskInvocations++;
+ return new ContainerTask(null, true, null, null, false);
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+ TezException {
+ umbilicalLock.lock();
+ if (request.getEvents() != null) {
+ requestEvents.addAll(request.getEvents());
+ }
+ try {
+ if (shouldThrowException) {
+ LOG.info("TestUmbilical throwing Exception");
+ throw new IOException(HEARTBEAT_EXCEPTION_STRING);
+ }
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
+ response.setLastRequestId(request.getRequestId());
+ if (shouldSendDieSignal) {
+ LOG.info("TestUmbilical returning shouldDie=true");
+ response.setShouldDie();
+ }
+ return response;
+ } finally {
+ if (pendingEvent) {
+ eventEnacted = true;
+ LOG.info("Signalling Event");
+ eventCondition.signal();
+ }
+ umbilicalLock.unlock();
+ }
+ }
+ }
+
+ private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
+ TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
+ createContainerId(appId).toString());
+ return taskReporter;
+ }
+
+ private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
+ throws IOException {
+ return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
+ processorConf);
+ }
+
+ private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, byte[] processorConf) throws IOException{
+ TezConfiguration tezConf = new TezConfiguration(defaultConf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Path testDir = new Path(workDir, UUID.randomUUID().toString());
+ String[] localDirs = new String[] { testDir.toString() };
+
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+ ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
+ TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
+ new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+
+ TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
+ new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
+ executor, null, "", new ExecutionContextImpl("localhost"));
+ return taskRunner;
+ }
+
+ private ContainerId createContainerId(ApplicationId appId) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+ return containerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
new file mode 100644
index 0000000..9add252
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTaskReporter {
+
+ @Test(timeout = 10000)
+ public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicBoolean hb2Done = new AtomicBoolean(false);
+
+ TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
+ if (request.getRequestId() == 1 || request.getRequestId() == 2) {
+ TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
+ response.setLastRequestId(request.getRequestId());
+ return response;
+ } else if (request.getRequestId() == 3) {
+ TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
+ response.setLastRequestId(request.getRequestId());
+ synchronized (lock) {
+ hb2Done.set(true);
+ lock.notify();
+ }
+ return response;
+ } else {
+ throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
+ }
+ }
+ }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
+
+ TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+ LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+ doReturn("vertexName").when(mockTask).getVertexName();
+ doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+
+ // Setup the sleep time to be way higher than the test timeout
+ TaskReporter.HeartbeatCallable heartbeatCallable =
+ new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+ new AtomicLong(0),
+ "containerIdStr");
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.submit(heartbeatCallable);
+ try {
+ synchronized (lock) {
+ if (!hb2Done.get()) {
+ lock.wait();
+ }
+ }
+ verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+ Thread.sleep(2000l);
+ // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
+ verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+ } finally {
+ executor.shutdownNow();
+ }
+
+ }
+
+ private List<TezEvent> createEvents(int numEvents) {
+ List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
+ for (int i = 0; i < numEvents; i++) {
+ list.add(mock(TezEvent.class));
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e01b985..629bab8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -32,10 +32,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -62,10 +62,7 @@ public class ShuffleUtils {
public static ByteBuffer convertJobTokenToBytes(
Token<JobTokenIdentifier> jobToken) throws IOException {
- DataOutputBuffer dob = new DataOutputBuffer();
- jobToken.write(dob);
- ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- return bb;
+ return TezCommonUtils.convertJobTokenToBytes(jobToken);
}
public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
[3/3] tez git commit: TEZ-1933. Move TezChild and related classes
into tez-runtime-internals. (sseth)
Posted by ss...@apache.org.
TEZ-1933. Move TezChild and related classes into tez-runtime-internals.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e250983e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e250983e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e250983e
Branch: refs/heads/master
Commit: e250983e57ba7fb60d71fb0b11aca331b209a876
Parents: fda4c0b
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 12 14:31:15 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 12 14:31:15 2015 -0800
----------------------------------------------------------------------
.../org/apache/tez/common/TezCommonUtils.java | 11 +
.../tez/dag/utils/RelocalizationUtils.java | 95 +++
.../tez/dag/utils/RelocalizationUtils.java | 93 ---
.../tez/runtime/task/ContainerReporter.java | 84 ---
.../apache/tez/runtime/task/ErrorReporter.java | 26 -
.../apache/tez/runtime/task/TaskReporter.java | 401 ----------
.../org/apache/tez/runtime/task/TezChild.java | 456 ------------
.../apache/tez/runtime/task/TezTaskRunner.java | 375 ----------
.../tez/runtime/task/TestTaskExecution.java | 711 ------------------
.../tez/runtime/task/TestTaskReporter.java | 115 ---
.../tez/runtime/task/ContainerReporter.java | 84 +++
.../apache/tez/runtime/task/ErrorReporter.java | 26 +
.../apache/tez/runtime/task/TaskReporter.java | 401 ++++++++++
.../org/apache/tez/runtime/task/TezChild.java | 456 ++++++++++++
.../apache/tez/runtime/task/TezTaskRunner.java | 375 ++++++++++
.../tez/runtime/task/TestTaskExecution.java | 729 +++++++++++++++++++
.../tez/runtime/task/TestTaskReporter.java | 114 +++
.../library/common/shuffle/ShuffleUtils.java | 7 +-
18 files changed, 2293 insertions(+), 2266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index b7a402d..685c728 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -19,6 +19,7 @@
package org.apache.tez.common;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -39,10 +40,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -385,6 +388,14 @@ public class TezCommonUtils {
return sb.toString();
}
+ public static ByteBuffer convertJobTokenToBytes(
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ jobToken.write(dob);
+ ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ return bb;
+ }
+
public static void logCredentials(Log log, Credentials credentials, String identifier) {
if (log.isDebugEnabled()) {
log.debug(getCredentialsInfo(credentials, identifier));
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
new file mode 100644
index 0000000..de0b94c
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.common.collect.Lists;
+
+@InterfaceAudience.Private
+public class RelocalizationUtils {
+
+ public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
+ Configuration conf, String destDir) throws IOException, TezException {
+ if (additionalResources == null || additionalResources.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
+
+ for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
+ Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf, destDir);
+ urls.add(dFile.toUri().toURL());
+ }
+ return urls;
+ }
+
+ public static void addUrlsToClassPath(List<URL> urls) {
+ ReflectionUtils.addResourcesToSystemClassLoader(urls);
+ }
+
+ private static Path downloadResource(String destName, URI uri, Configuration conf, String destDir)
+ throws IOException {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path cwd = new Path(destDir);
+ Path dFile = new Path(cwd, destName);
+ Path srcPath = new Path(uri);
+ fs.copyToLocalFile(srcPath, dFile);
+ return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
+ }
+
+ public static byte[] getLocalSha(Path path, Configuration conf) throws IOException {
+ InputStream is = null;
+ try {
+ is = FileSystem.getLocal(conf).open(path);
+ return DigestUtils.sha256(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+
+ public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException {
+ InputStream is = null;
+ try {
+ is = FileSystem.get(uri, conf).open(new Path(uri));
+ return DigestUtils.sha256(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
deleted file mode 100644
index 7b5c49c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.utils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezException;
-
-import com.google.common.collect.Lists;
-
-public class RelocalizationUtils {
-
- public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
- Configuration conf, String destDir) throws IOException, TezException {
- if (additionalResources == null || additionalResources.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
-
- for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
- Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf, destDir);
- urls.add(dFile.toUri().toURL());
- }
- return urls;
- }
-
- public static void addUrlsToClassPath(List<URL> urls) {
- ReflectionUtils.addResourcesToSystemClassLoader(urls);
- }
-
- private static Path downloadResource(String destName, URI uri, Configuration conf, String destDir)
- throws IOException {
- FileSystem fs = FileSystem.get(uri, conf);
- Path cwd = new Path(destDir);
- Path dFile = new Path(cwd, destName);
- Path srcPath = new Path(uri);
- fs.copyToLocalFile(srcPath, dFile);
- return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
- }
-
- public static byte[] getLocalSha(Path path, Configuration conf) throws IOException {
- InputStream is = null;
- try {
- is = FileSystem.getLocal(conf).open(path);
- return DigestUtils.sha256(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
-
- public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException {
- InputStream is = null;
- try {
- is = FileSystem.get(uri, conf).open(new Path(uri));
- return DigestUtils.sha256(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
deleted file mode 100644
index a68c7c1..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-
-/**
- * Responsible for communication between a running Container and the ApplicationMaster. The main
- * functionality is to poll for new tasks.
- *
- */
-public class ContainerReporter implements Callable<ContainerTask> {
-
- private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
-
- private final TezTaskUmbilicalProtocol umbilical;
- private final ContainerContext containerContext;
- private final int getTaskMaxSleepTime;
- private final long LOG_INTERVAL = 2000l;
-
- private long nextGetTaskPrintTime;
-
- ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
- int getTaskMaxSleepTime) {
- this.umbilical = umbilical;
- this.containerContext = containerContext;
- this.getTaskMaxSleepTime = getTaskMaxSleepTime;
- }
-
- @Override
- public ContainerTask call() throws Exception {
- ContainerTask containerTask = null;
- LOG.info("Attempting to fetch new task");
- containerTask = umbilical.getTask(containerContext);
- long getTaskPollStartTime = System.currentTimeMillis();
- nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
- for (int idle = 1; containerTask == null; idle++) {
- long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
- maybeLogSleepMessage(sleepTimeMilliSecs);
- TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
- containerTask = umbilical.getTask(containerContext);
- }
- LOG.info("Got TaskUpdate: "
- + (System.currentTimeMillis() - getTaskPollStartTime)
- + " ms after starting to poll."
- + " TaskInfo: shouldDie: "
- + containerTask.shouldDie()
- + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
- + containerTask.getTaskSpec().getTaskAttemptID()));
- return containerTask;
- }
-
- private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
- long currentTime = System.currentTimeMillis();
- if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
- LOG.info("Sleeping for " + sleepTimeMilliSecs
- + "ms before retrying getTask again. Got null now. "
- + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
- nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
deleted file mode 100644
index 1146ce4..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-public interface ErrorReporter {
-
- void reportError(Throwable t);
-
- void shutdownRequested();
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
deleted file mode 100644
index 15dcbb0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Responsible for communication between tasks running in a Container and the ApplicationMaster.
- * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
- * retrieve events specific to this task.
- *
- */
-public class TaskReporter {
-
- private static final Logger LOG = Logger.getLogger(TaskReporter.class);
-
- private final TezTaskUmbilicalProtocol umbilical;
- private final long pollInterval;
- private final long sendCounterInterval;
- private final int maxEventsToGet;
- private final AtomicLong requestCounter;
- private final String containerIdStr;
-
- private final ListeningExecutorService heartbeatExecutor;
-
- @VisibleForTesting
- HeartbeatCallable currentCallable;
-
- public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
- long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
- this.umbilical = umbilical;
- this.pollInterval = amPollInterval;
- this.sendCounterInterval = sendCounterInterval;
- this.maxEventsToGet = maxEventsToGet;
- this.requestCounter = requestCounter;
- this.containerIdStr = containerIdStr;
- ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
- heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
- }
-
- /**
- * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
- */
- public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
- ErrorReporter errorReporter) {
- currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
- maxEventsToGet, requestCounter, containerIdStr);
- ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
- Futures.addCallback(future, new HeartbeatCallback(errorReporter));
- }
-
- /**
- * This method should always be invoked before setting up heartbeats for another task running in
- * the same container.
- */
- public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
- currentCallable.markComplete();
- currentCallable = null;
- }
-
- public void shutdown() {
- heartbeatExecutor.shutdownNow();
- }
-
- @VisibleForTesting
- static class HeartbeatCallable implements Callable<Boolean> {
-
- private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
- private static final float LOG_COUNTER_BACKOFF = 1.3f;
-
- private final LogicalIOProcessorRuntimeTask task;
- private EventMetaData updateEventMetadata;
-
- private final TezTaskUmbilicalProtocol umbilical;
-
- private final long pollInterval;
- private final long sendCounterInterval;
- private final int maxEventsToGet;
- private final String containerIdStr;
-
- private final AtomicLong requestCounter;
-
- private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
-
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition condition = lock.newCondition();
-
- /*
- * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
- * log counters.
- */
- private int nonOobHeartbeatCounter = 0;
- private int nextHeartbeatNumToLog = 0;
- /*
- * Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
- */
- private int prevCounterSendHeartbeatNum = 0;
-
- public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
- TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
- int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
-
- this.pollInterval = amPollInterval;
- this.sendCounterInterval = sendCounterInterval;
- this.maxEventsToGet = maxEventsToGet;
- this.requestCounter = requestCounter;
- this.containerIdStr = containerIdStr;
-
- this.task = task;
- this.umbilical = umbilical;
- this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
- task.getVertexName(), "", task.getTaskAttemptID());
-
- nextHeartbeatNumToLog = (Math.max(1,
- (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
- : (float) amPollInterval))));
- }
-
- @Override
- public Boolean call() throws Exception {
- // Heartbeat only for active tasks. Errors, etc will be reported directly.
- while (!task.isTaskDone() && !task.hadFatalError()) {
- ResponseWrapper response = heartbeat(null);
-
- if (response.shouldDie) {
- // AM sent a shouldDie=true
- LOG.info("Asked to die via task heartbeat");
- return false;
- } else {
- if (response.numEvents < maxEventsToGet) {
- // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
- lock.lock();
- try {
- boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
- if (!interrupted) {
- nonOobHeartbeatCounter++;
- }
- } finally {
- lock.unlock();
- }
- }
- }
- }
- int pendingEventCount = eventsToSend.size();
- if (pendingEventCount > 0) {
- LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
- }
- return true;
- }
-
- /**
- * @param eventsArg
- * @return
- * @throws IOException
- * indicates an RPC communication failure.
- * @throws TezException
- * indicates an exception somewhere in the AM.
- */
- private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
- TezException {
-
- if (eventsArg != null) {
- eventsToSend.addAll(eventsArg);
- }
-
- TezEvent updateEvent = null;
- List<TezEvent> events = new ArrayList<TezEvent>();
- eventsToSend.drainTo(events);
-
- if (!task.isTaskDone() && !task.hadFatalError()) {
- TezCounters counters = null;
- /**
- * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
- * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
- * real time decisions are made based on these counters, it can be sent once per second.
- */
- // Not completely accurate, since OOB heartbeats could go out.
- if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
- counters = task.getCounters();
- prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
- }
- updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
- updateEventMetadata);
- events.add(updateEvent);
- }
-
- long requestId = requestCounter.incrementAndGet();
- TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
- task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending heartbeat to AM, request=" + request);
- }
-
- maybeLogCounters();
-
- TezHeartbeatResponse response = umbilical.heartbeat(request);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat response from AM, response=" + response);
- }
-
- if (response.shouldDie()) {
- LOG.info("Received should die response from AM");
- return new ResponseWrapper(true, 1);
- }
- if (response.getLastRequestId() != requestId) {
- throw new TezException("AM and Task out of sync" + ", responseReqId="
- + response.getLastRequestId() + ", expectedReqId=" + requestId);
- }
-
- // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
- // are running using the same umbilical.
- int numEventsReceived = 0;
- if (task.isTaskDone() || task.hadFatalError()) {
- if (response.getEvents() != null && !response.getEvents().isEmpty()) {
- LOG.warn("Current task already complete, Ignoring all event in"
- + " heartbeat response, eventCount=" + response.getEvents().size());
- }
- } else {
- if (response.getEvents() != null && !response.getEvents().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
- + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
- }
- // This should ideally happen in a separate thread
- numEventsReceived = response.getEvents().size();
- task.handleEvents(response.getEvents());
- }
- }
- return new ResponseWrapper(false, numEventsReceived);
- }
-
- public void markComplete() {
- // Notify to clear pending events, if any.
- lock.lock();
- try {
- condition.signal();
- } finally {
- lock.unlock();
- }
- }
-
- private void maybeLogCounters() {
- if (LOG.isDebugEnabled()) {
- if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
- LOG.debug("Counters: " + task.getCounters().toShortString());
- nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
- }
- }
- }
-
- /**
- * Sends out final events for task success.
- * @param taskAttemptID
- * @return
- * @throws IOException
- * indicates an RPC communication failure.
- * @throws TezException
- * indicates an exception somewhere in the AM.
- */
- private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
- task.getProgress()), updateEventMetadata);
- TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
- updateEventMetadata);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
- }
-
- /**
- * Sends out final events for task failure.
- * @param taskAttemptID
- * @param t
- * @param diagnostics
- * @param srcMeta
- * @return
- * @throws IOException
- * indicates an RPC communication failure.
- * @throws TezException
- * indicates an exception somewhere in the AM.
- */
- private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
- EventMetaData srcMeta) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
- task.getProgress()), updateEventMetadata);
- if (diagnostics == null) {
- diagnostics = ExceptionUtils.getStackTrace(t);
- } else {
- diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
- }
- TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
- srcMeta == null ? updateEventMetadata : srcMeta);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
- }
-
- private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
- if (events != null && !events.isEmpty()) {
- eventsToSend.addAll(events);
- }
- }
- }
-
- private static class HeartbeatCallback implements FutureCallback<Boolean> {
-
- private final ErrorReporter errorReporter;
-
- HeartbeatCallback(ErrorReporter errorReporter) {
- this.errorReporter = errorReporter;
- }
-
- @Override
- public void onSuccess(Boolean result) {
- if (result == false) {
- errorReporter.shutdownRequested();
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- errorReporter.reportError(t);
- }
- }
-
- public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
- return currentCallable.taskSucceeded(taskAttemptID);
- }
-
- public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
- EventMetaData srcMeta) throws IOException, TezException {
- return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
- }
-
- public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
- currentCallable.addEvents(taskAttemptID, events);
- }
-
- public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
- return umbilical.canCommit(taskAttemptID);
- }
-
- private static final class ResponseWrapper {
- boolean shouldDie;
- int numEvents;
-
- private ResponseWrapper(boolean shouldDie, int numEvents) {
- this.shouldDie = shouldDie;
- this.numEvents = numEvents;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
deleted file mode 100644
index ae77709..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezLocalResource;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.Limits;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.RelocalizationUtils;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TezChild {
-
- private static final Logger LOG = Logger.getLogger(TezChild.class);
-
- private final Configuration defaultConf;
- private final String containerIdString;
- private final int appAttemptNumber;
- private final String[] localDirs;
-
- private final AtomicLong heartbeatCounter = new AtomicLong(0);
-
- private final int getTaskMaxSleepTime;
- private final int amHeartbeatInterval;
- private final long sendCounterInterval;
- private final int maxEventsToGet;
- private final boolean isLocal;
- private final String workingDir;
-
- private final ListeningExecutorService executor;
- private final ObjectRegistryImpl objectRegistry;
- private final String pid;
- private final ExecutionContext ExecutionContext;
- private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- private final Map<String, String> serviceProviderEnvMap;
-
- private Multimap<String, String> startedInputsMap = HashMultimap.create();
-
- private TaskReporter taskReporter;
- private TezTaskUmbilicalProtocol umbilical;
- private int taskCount = 0;
- private TezVertexID lastVertexID;
-
- public TezChild(Configuration conf, String host, int port, String containerIdentifier,
- String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
- Map<String, String> serviceProviderEnvMap,
- ObjectRegistryImpl objectRegistry, String pid,
- ExecutionContext ExecutionContext)
- throws IOException, InterruptedException {
- this.defaultConf = conf;
- this.containerIdString = containerIdentifier;
- this.appAttemptNumber = appAttemptNumber;
- this.localDirs = localDirs;
- this.serviceProviderEnvMap = serviceProviderEnvMap;
- this.workingDir = workingDir;
- this.pid = pid;
- this.ExecutionContext = ExecutionContext;
-
- getTaskMaxSleepTime = defaultConf.getInt(
- TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
- TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
-
- amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
- sendCounterInterval = defaultConf.getLong(
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
-
- maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
- TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
-
- ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("TezChild").build());
- this.executor = MoreExecutors.listeningDecorator(executor);
-
- this.objectRegistry = objectRegistry;
-
- // Security framework already loaded the tokens into current ugi
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Executing with tokens:");
- for (Token<?> token : credentials.getAllTokens()) {
- LOG.debug(token);
- }
- }
-
- this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
- TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
- UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
- Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
-
- serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
- ShuffleUtils.convertJobTokenToBytes(jobToken));
-
- if (!isLocal) {
- final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
- SecurityUtil.setTokenService(jobToken, address);
- taskOwner.addToken(jobToken);
- umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
- @Override
- public TezTaskUmbilicalProtocol run() throws Exception {
- return RPC.getProxy(TezTaskUmbilicalProtocol.class,
- TezTaskUmbilicalProtocol.versionID, address, defaultConf);
- }
- });
- }
- }
-
- public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
-
- ContainerContext containerContext = new ContainerContext(containerIdString);
- ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
- getTaskMaxSleepTime);
-
- taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
- sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
-
- UserGroupInformation childUGI = null;
-
- while (!executor.isTerminated()) {
- if (taskCount > 0) {
- TezUtilsInternal.updateLoggers("");
- }
- ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
- ContainerTask containerTask = null;
- try {
- containerTask = getTaskFuture.get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- handleError(cause);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
- cause, "Execution Exception while fetching new work: " + e.getMessage());
- } catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for new work:"
- + containerTask.getTaskSpec().getTaskAttemptID());
- handleError(e);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
- "Interrupted while waiting for new work");
- }
- if (containerTask.shouldDie()) {
- LOG.info("ContainerTask returned shouldDie=true, Exiting");
- shutdown();
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
- "Asked to die by the AM");
- } else {
- String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
- taskCount++;
- TezUtilsInternal.updateLoggers(loggerAddend);
- FileSystem.clearStatistics();
-
- childUGI = handleNewTaskCredentials(containerTask, childUGI);
- handleNewTaskLocalResources(containerTask);
- cleanupOnTaskChanged(containerTask);
-
- // Execute the Actual Task
- TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
- localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
- serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
- executor, objectRegistry, pid, this.ExecutionContext);
- boolean shouldDie;
- try {
- shouldDie = !taskRunner.run();
- if (shouldDie) {
- LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
- shutdown();
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
- "Asked to die by the AM");
- }
- } catch (IOException e) {
- handleError(e);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
- e, "TaskExecutionFailure: " + e.getMessage());
- } catch (TezException e) {
- handleError(e);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
- e, "TaskExecutionFailure: " + e.getMessage());
- } finally {
- FileSystem.closeAllForUGI(childUGI);
- }
- }
- }
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
- null);
- }
-
- /**
- * Setup
- *
- * @param containerTask
- * the new task specification. Must be a valid task
- * @param childUGI
- * the old UGI instance being used
- * @return childUGI
- */
- UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
- UserGroupInformation childUGI) {
- // Re-use the UGI only if the Credentials have not changed.
- Preconditions.checkState(!containerTask.shouldDie());
- Preconditions.checkState(containerTask.getTaskSpec() != null);
- if (containerTask.haveCredentialsChanged()) {
- LOG.info("Refreshing UGI since Credentials have changed");
- Credentials taskCreds = containerTask.getCredentials();
- if (taskCreds != null) {
- LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
- + taskCreds.numberOfSecretKeys());
- childUGI = UserGroupInformation.createRemoteUser(System
- .getenv(ApplicationConstants.Environment.USER.toString()));
- childUGI.addCredentials(containerTask.getCredentials());
- } else {
- LOG.info("Not loading any credentials, since no credentials provided");
- }
- }
- return childUGI;
- }
-
- /**
- * Handles any additional resources to be localized for the new task
- *
- * @param containerTask
- * @throws IOException
- * @throws TezException
- */
- private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
- TezException {
- Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Additional Resources added to container: " + additionalResources);
- }
-
- LOG.info("Localizing additional local resources for Task : " + additionalResources);
- List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
- Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
- @Override
- public URI apply(TezLocalResource input) {
- return input.getUri();
- }
- }), defaultConf, workingDir);
- RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
-
- LOG.info("Done localizing additional resources");
- final TaskSpec taskSpec = containerTask.getTaskSpec();
- if (LOG.isDebugEnabled()) {
- LOG.debug("New container task context:" + taskSpec.toString());
- }
- }
-
- /**
- * Cleans entries from the object registry, and resets the startedInputsMap if required
- *
- * @param containerTask
- * the new task specification. Must be a valid task
- */
- private void cleanupOnTaskChanged(ContainerTask containerTask) {
- Preconditions.checkState(!containerTask.shouldDie());
- Preconditions.checkState(containerTask.getTaskSpec() != null);
- TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
- .getVertexID();
- if (lastVertexID != null) {
- if (!lastVertexID.equals(newVertexID)) {
- objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX);
- }
- if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
- objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.DAG);
- startedInputsMap = HashMultimap.create();
- }
- }
- lastVertexID = newVertexID;
- }
-
- private void shutdown() {
- executor.shutdownNow();
- if (taskReporter != null) {
- taskReporter.shutdown();
- }
- RPC.stopProxy(umbilical);
- DefaultMetricsSystem.shutdown();
- if (!isLocal) {
- LogManager.shutdown();
- }
- }
-
- public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
- if(tezTaskUmbilicalProtocol != null){
- this.umbilical = tezTaskUmbilicalProtocol;
- }
- }
-
- public static class ContainerExecutionResult {
- public static enum ExitStatus {
- SUCCESS(0),
- EXECUTION_FAILURE(1),
- INTERRUPTED(2),
- ASKED_TO_DIE(3);
-
- private final int exitCode;
-
- ExitStatus(int code) {
- this.exitCode = code;
- }
-
- public int getExitCode() {
- return this.exitCode;
- }
- }
-
- private final ExitStatus exitStatus;
- private final Throwable throwable;
- private final String errorMessage;
-
- ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
- @Nullable String errorMessage) {
- this.exitStatus = exitStatus;
- this.throwable = throwable;
- this.errorMessage = errorMessage;
- }
-
- public ExitStatus getExitStatus() {
- return this.exitStatus;
- }
-
- public Throwable getThrowable() {
- return this.throwable;
- }
-
- public String getErrorMessage() {
- return this.errorMessage;
- }
- }
-
- public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
- String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
- Map<String, String> serviceProviderEnvMap, @Nullable String pid,
- ExecutionContext ExecutionContext)
- throws IOException, InterruptedException, TezException {
-
- // Pull in configuration specified for the session.
- // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
- // for each and every task, and reading it back from disk. Also needs to be per vertex.
- TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
- UserGroupInformation.setConfiguration(conf);
- Limits.setConfiguration(conf);
-
- // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
- // of this class. Leaving it here, till there's some entity representing a running JVM.
- DefaultMetricsSystem.initialize("TezTask");
-
- // singleton of ObjectRegistry for this JVM
- ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
-
- return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
- attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
- ExecutionContext);
- }
-
- public static void main(String[] args) throws IOException, InterruptedException, TezException {
-
- final Configuration defaultConf = new Configuration();
-
- Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- LOG.info("TezChild starting");
-
- assert args.length == 5;
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- final String containerIdentifier = args[2];
- final String tokenIdentifier = args[3];
- final int attemptNumber = Integer.parseInt(args[4]);
- final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
- .name()));
- final String pid = System.getenv().get("JVM_PID");
- LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
- + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
- + " tokenIdentifier: " + tokenIdentifier);
- }
- TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
- tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
- System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
- tezChild.run();
- }
-
- private void handleError(Throwable t) {
- shutdown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
deleted file mode 100644
index 6e655f9..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.ObjectRegistry;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
-
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class TezTaskRunner implements TezUmbilical, ErrorReporter {
-
- private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
-
- private final Configuration tezConf;
- private final LogicalIOProcessorRuntimeTask task;
- private final UserGroupInformation ugi;
-
- private final TaskReporter taskReporter;
- private final ListeningExecutorService executor;
- private volatile ListenableFuture<Void> taskFuture;
- private volatile Thread waitingThread;
- private volatile Throwable firstException;
-
- // Effectively a duplicate check, since hadFatalError does the same thing.
- private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
- private final AtomicBoolean taskRunning;
- private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
-
- TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
- TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
- Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
- Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
- ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
- ExecutionContext ExecutionContext)
- throws IOException {
- this.tezConf = tezConf;
- this.ugi = ugi;
- this.taskReporter = taskReporter;
- this.executor = executor;
- task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
- serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
- ExecutionContext);
- taskReporter.registerTask(task, this);
- taskRunning = new AtomicBoolean(true);
- }
-
- /**
- * @return false if a shutdown message was received during task execution
- * @throws TezException
- * @throws IOException
- */
- public boolean run() throws InterruptedException, IOException, TezException {
- waitingThread = Thread.currentThread();
- TaskRunnerCallable callable = new TaskRunnerCallable();
- Throwable failureCause = null;
- taskFuture = executor.submit(callable);
- try {
- taskFuture.get();
-
- // Task could signal a fatal error and return control, or a failure while registering success.
- failureCause = firstException;
-
- } catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for task to complete. Interrupting task");
- taskFuture.cancel(true);
- if (shutdownRequested.get()) {
- LOG.info("Shutdown requested... returning");
- return false;
- }
- if (firstException != null) {
- failureCause = firstException;
- } else {
- // Interrupted for some other reason.
- failureCause = e;
- }
- } catch (ExecutionException e) {
- // Exception thrown by the run() method itself.
- Throwable cause = e.getCause();
- if (cause instanceof FSError) {
- // Not immediately fatal, this is an error reported by Hadoop FileSystem
- failureCause = cause;
- } else if (cause instanceof Error) {
- LOG.error("Exception of type Error.", cause);
- sendFailure(cause, "Fatal Error cause TezChild exit.");
- throw new TezException("Fatal Error cause TezChild exit.", cause);
- } else {
- failureCause = cause;
- }
- } finally {
- // Clear the interrupted status of the blocking thread, in case it is set after the
- // InterruptedException was invoked.
- taskReporter.unregisterTask(task.getTaskAttemptID());
- Thread.interrupted();
- }
-
- if (failureCause != null) {
- if (failureCause instanceof FSError) {
- // Not immediately fatal, this is an error reported by Hadoop FileSystem
- LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
- failureCause);
- throw (FSError) failureCause;
- } else if (failureCause instanceof Error) {
- LOG.error("Exception of type Error.", failureCause);
- sendFailure(failureCause, "Fatal error cause TezChild exit.");
- throw new TezException("Fatal error cause TezChild exit.", failureCause);
- } else {
- if (failureCause instanceof IOException) {
- throw (IOException) failureCause;
- } else if (failureCause instanceof TezException) {
- throw (TezException) failureCause;
- } else if (failureCause instanceof InterruptedException) {
- throw (InterruptedException) failureCause;
- } else {
- throw new TezException(failureCause);
- }
- }
- }
- if (shutdownRequested.get()) {
- LOG.info("Shutdown requested... returning");
- return false;
- }
- return true;
- }
-
- private class TaskRunnerCallable implements Callable<Void> {
-
- @Override
- public Void call() throws Exception {
- try {
- return ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- try {
- LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
- task.initialize();
- if (!Thread.currentThread().isInterrupted() && firstException == null) {
- LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
- task.run();
- LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
- task.close();
- task.setFrameworkCounters();
- }
- LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
- + ", fatalErrorOccurred=" + (firstException != null));
- if (firstException == null) {
- try {
- taskReporter.taskSucceeded(task.getTaskAttemptID());
- } catch (IOException e) {
- LOG.warn("Heartbeat failure caused by communication failure", e);
- maybeRegisterFirstException(e);
- // Falling off, since the runner thread checks for the registered exception.
- } catch (TezException e) {
- LOG.warn("Heartbeat failure reported by AM", e);
- maybeRegisterFirstException(e);
- // Falling off, since the runner thread checks for the registered exception.
- }
- }
- return null;
- } catch (Throwable cause) {
- if (cause instanceof FSError) {
- // Not immediately fatal, this is an error reported by Hadoop FileSystem
- maybeRegisterFirstException(cause);
- LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
- cause);
- try {
- sendFailure(cause, "FS Error in Child JVM");
- } catch (Exception ignored) {
- // Ignored since another cause is already known
- LOG.info(
- "Ignoring the following exception since a previous exception is already registered",
- ignored);
- }
- throw (FSError) cause;
- } else if (cause instanceof Error) {
- LOG.error("Exception of type Error.", cause);
- sendFailure(cause, "Fatal Error cause TezChild exit.");
- throw new TezException("Fatal Error cause TezChild exit.", cause);
- } else {
- if (cause instanceof UndeclaredThrowableException) {
- cause = ((UndeclaredThrowableException) cause).getCause();
- }
- maybeRegisterFirstException(cause);
- LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
- cause);
- try {
- sendFailure(cause, "Failure while running task");
- } catch (Exception ignored) {
- // Ignored since another cause is already known
- LOG.info(
- "Ignoring the following exception since a previous exception is already registered",
- ignored);
- }
- if (cause instanceof IOException) {
- throw (IOException) cause;
- } else if (cause instanceof TezException) {
- throw (TezException) cause;
- } else {
- throw new TezException(cause);
- }
- }
- } finally {
- task.cleanup();
- }
- }
- });
- } finally {
- taskRunning.set(false);
- }
- }
- }
-
- // should wait until all messages are sent to AM before TezChild shutdown
- // if this method become async in future
- private void sendFailure(Throwable t, String message) throws IOException, TezException {
- if (!fatalErrorSent.getAndSet(true)) {
- task.setFatalError(t, message);
- task.setFrameworkCounters();
- try {
- taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
- } catch (IOException e) {
- // A failure reason already exists, Comm error just logged.
- LOG.warn("Heartbeat failure caused by communication failure", e);
- throw e;
- } catch (TezException e) {
- // A failure reason already exists, Comm error just logged.
- LOG.warn("Heartbeat failure reported by AM", e);
- throw e;
- }
- } else {
- LOG.warn("Ignoring fatal error since another error has already been reported", t);
- }
- }
-
- @Override
- public void addEvents(Collection<TezEvent> events) {
- if (taskRunning.get()) {
- taskReporter.addEvents(task.getTaskAttemptID(), events);
- }
- }
-
- @Override
- public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
- String message, EventMetaData sourceInfo) {
- // This can be called before a task throws an exception or after it.
- // If called before a task throws an exception
- // - ensure a heartbeat is sent with the diagnostics, and sent only once.
- // - interrupt the waiting thread, and make it throw the reported error.
- // If called after a task throws an exception, the waiting task has already returned, no point
- // interrupting it.
- // This case can be effectively ignored (log), as long as the run() method ends up throwing the
- // exception.
- //
- //
- if (!fatalErrorSent.getAndSet(true)) {
- maybeRegisterFirstException(t);
- try {
- taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
- } catch (IOException e) {
- // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
- // occurred earlier.
- LOG.warn("Heartbeat failure caused by communication failure", e);
- } catch (TezException e) {
- // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
- // occurred earlier.
- LOG.warn("Heartbeat failure reported by AM", e);
- } finally {
- // Wake up the waiting thread so that it can return control
- waitingThread.interrupt();
- }
- }
- }
-
- @Override
- public boolean canCommit(TezTaskAttemptID taskAttemptID) {
- if (taskRunning.get()) {
- try {
- return taskReporter.canCommit(taskAttemptID);
- } catch (IOException e) {
- LOG.warn("Communication failure while trying to commit", e);
- maybeRegisterFirstException(e);
- waitingThread.interrupt();
- // Not informing the task since it will be interrupted.
- // TODO: Should this be sent to the task as well, current Processors, etc do not handle
- // interrupts very well.
- return false;
- }
- } else {
- return false;
- }
- }
-
- @Override
- public synchronized void reportError(Throwable t) {
- if (taskRunning.get()) {
- LOG.error("TaskReporter reported error", t);
- maybeRegisterFirstException(t);
- waitingThread.interrupt();
- // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
- // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
- // method does not throw an exception, in which case task success is registered with the AM.
- // Leave this handling to the next getTask / actual task.
- } else {
- LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
- + " is already complete");
- }
- }
-
- @Override
- public void shutdownRequested() {
- shutdownRequested.set(true);
- waitingThread.interrupt();
- }
-
- private String getTaskDiagnosticsString(Throwable t, String message) {
- String diagnostics;
- if (t != null && message != null) {
- diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
- + message;
- } else if (t == null && message == null) {
- diagnostics = "Unknown error";
- } else {
- diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
- : " errorMessage=" + message;
- }
- return diagnostics;
- }
-
- private synchronized void maybeRegisterFirstException(Throwable t) {
- if (firstException == null) {
- firstException = t;
- }
- }
-
-}
[2/3] tez git commit: TEZ-1933. Move TezChild and related classes
into tez-runtime-internals. (sseth)
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
deleted file mode 100644
index b18324a..0000000
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.InputSpec;
-import org.apache.tez.runtime.api.impl.OutputSpec;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-// Tests in this class cannot be run in parallel.
-public class TestTaskExecution {
-
- private static final Logger LOG = Logger.getLogger(TestTaskExecution.class);
-
- private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
-
- private static final Configuration defaultConf = new Configuration();
- private static final FileSystem localFs;
- private static final Path workDir;
-
- private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
-
- static {
- defaultConf.set("fs.defaultFS", "file:///");
- try {
- localFs = FileSystem.getLocal(defaultConf);
- Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
- TestTaskExecution.class.getSimpleName());
- workDir = localFs.makeQualified(wd);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Before
- public void reset() {
- TestProcessor.reset();
- }
-
- @AfterClass
- public static void shutdown() {
- taskExecutor.shutdownNow();
- }
-
- @Test(timeout = 5000)
- public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
- ExecutionException {
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.signal();
- boolean result = taskRunnerFuture.get();
- assertTrue(result);
- assertNull(taskReporter.currentCallable);
- umbilical.verifyTaskSuccessEvent();
- } finally {
- executor.shutdownNow();
- }
- }
-
- @Test(timeout = 5000)
- public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
- ExecutionException {
-
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.signal();
- boolean result = taskRunnerFuture.get();
- assertTrue(result);
- assertNull(taskReporter.currentCallable);
- umbilical.verifyTaskSuccessEvent();
- umbilical.resetTrackedEvents();
-
- taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
- // Setup the executor
- taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.signal();
- result = taskRunnerFuture.get();
- assertTrue(result);
- assertNull(taskReporter.currentCallable);
- umbilical.verifyTaskSuccessEvent();
- } finally {
- executor.shutdownNow();
- }
- }
-
- // test tasked failed due to exception in Processor
- @Test(timeout = 5000)
- public void testFailedTask() throws IOException, InterruptedException, TezException {
-
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_THROW_TEZ_EXCEPTION);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.awaitStart();
- TestProcessor.signal();
- try {
- taskRunnerFuture.get();
- fail("Expecting the task to fail");
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- LOG.info(cause.getClass().getName());
- assertTrue(cause instanceof TezException);
- }
-
- assertNull(taskReporter.currentCallable);
- umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezException: TezException");
- } finally {
- executor.shutdownNow();
- }
- }
-
- // Test task failed due to Processor class not found
- @Test(timeout = 5000)
- public void testFailedTask2() throws IOException, InterruptedException, TezException {
-
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- try {
- taskRunnerFuture.get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- LOG.info(cause.getClass().getName());
- assertTrue(cause instanceof TezException);
- }
- assertNull(taskReporter.currentCallable);
- umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezUncheckedException: "
- + "Unable to load class: NotExitedProcessor");
- } finally {
- executor.shutdownNow();
- }
- }
-
- @Test(timeout = 5000)
- public void testHeartbeatException() throws IOException, InterruptedException, TezException {
-
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.awaitStart();
- umbilical.signalThrowException();
- umbilical.awaitRegisteredEvent();
- // Not signaling an actual start to verify task interruption
- try {
- taskRunnerFuture.get();
- fail("Expecting the task to fail");
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- assertTrue(cause instanceof IOException);
- assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
- }
- TestProcessor.awaitCompletion();
- assertTrue(TestProcessor.wasInterrupted());
- assertNull(taskReporter.currentCallable);
- // No completion events since umbilical communication already failed.
- umbilical.verifyNoCompletionEvents();
- } finally {
- executor.shutdownNow();
- }
- }
-
- @Test(timeout = 5000)
- public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
- ExecutionException {
-
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
- TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
- // Setup the executor
- Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
- // Signal the processor to go through
- TestProcessor.awaitStart();
- umbilical.signalSendShouldDie();
- umbilical.awaitRegisteredEvent();
- // Not signaling an actual start to verify task interruption
-
- boolean result = taskRunnerFuture.get();
- assertFalse(result);
-
- TestProcessor.awaitCompletion();
- assertTrue(TestProcessor.wasInterrupted());
- assertNull(taskReporter.currentCallable);
- // TODO Is this statement correct ?
- // No completion events since shouldDie was requested by the AM, which should have killed the
- // task.
- umbilical.verifyNoCompletionEvents();
- } finally {
- executor.shutdownNow();
- }
- }
-
- @Test(timeout = 5000)
- public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
- ListeningExecutorService executor = null;
- try {
- ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
- executor = MoreExecutors.listeningDecorator(rawExecutor);
- ApplicationId appId = ApplicationId.newInstance(10000, 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-
- TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
- ContainerContext containerContext = new ContainerContext(containerId.toString());
-
- ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
- ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
-
- getTaskFuture.get();
- assertEquals(1, umbilical.getTaskInvocations);
-
- } finally {
- executor.shutdownNow();
- }
- }
-
- // Potential new tests
- // Different states - initialization failure, close failure
- // getTask states
-
- private static class TaskRunnerCallable implements Callable<Boolean> {
- private final TezTaskRunner taskRunner;
-
- public TaskRunnerCallable(TezTaskRunner taskRunner) {
- this.taskRunner = taskRunner;
- }
-
- @Override
- public Boolean call() throws Exception {
- return taskRunner.run();
- }
- }
-
- // Uses static fields for signaling. Ensure only used by one test at a time.
- public static class TestProcessor extends SimpleProcessor {
-
- public static final byte[] CONF_EMPTY = new byte[] { 0 };
- public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
- public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
- public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
-
- private static final Logger LOG = Logger.getLogger(TestProcessor.class);
-
- private static final ReentrantLock processorLock = new ReentrantLock();
- private static final Condition processorCondition = processorLock.newCondition();
- private static final Condition completionCondition = processorLock.newCondition();
- private static final Condition runningCondition = processorLock.newCondition();
- private static boolean completed = false;
- private static boolean running = false;
- private static boolean signalled = false;
-
- public static boolean receivedInterrupt = false;
-
- private boolean throwIOException = false;
- private boolean throwTezException = false;
- private boolean signalFatalAndThrow = false;
- private boolean signalFatalAndLoop = false;
- private boolean signalFatalAndComplete = false;
-
- public TestProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void initialize() throws Exception {
- parseConf(getContext().getUserPayload().deepCopyAsArray());
- }
-
- private void parseConf(byte[] bytes) {
- byte b = bytes[0];
- throwIOException = (b & 1) > 1;
- throwTezException = (b & 2) > 1;
- signalFatalAndThrow = (b & 4) > 1;
- signalFatalAndLoop = (b & 8) > 1;
- signalFatalAndComplete = (b & 16) > 1;
- }
-
- public static void reset() {
- signalled = false;
- receivedInterrupt = false;
- completed = false;
- running = false;
- }
-
- public static void signal() {
- LOG.info("Signalled");
- processorLock.lock();
- try {
- signalled = true;
- processorCondition.signal();
- } finally {
- processorLock.unlock();
- }
- }
-
- public static void awaitStart() throws InterruptedException {
- LOG.info("Awaiting Process run");
- processorLock.lock();
- try {
- if (running) {
- return;
- }
- runningCondition.await();
- } finally {
- processorLock.unlock();
- }
- }
-
- public static void awaitCompletion() throws InterruptedException {
- LOG.info("Await completion");
- processorLock.lock();
- try {
- if (completed) {
- return;
- } else {
- completionCondition.await();
- }
- } finally {
- processorLock.unlock();
- }
- }
-
- public static boolean wasInterrupted() {
- processorLock.lock();
- try {
- return receivedInterrupt;
- } finally {
- processorLock.unlock();
- }
- }
-
- @Override
- public void run() throws Exception {
- processorLock.lock();
- running = true;
- runningCondition.signal();
- try {
- try {
- LOG.info("Signal is: " + signalled);
- if (!signalled) {
- LOG.info("Waiting for processor signal");
- processorCondition.await();
- }
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
- LOG.info("Received processor signal");
- if (throwIOException) {
- throw new IOException();
- } else if (throwTezException) {
- throw new TezException("TezException");
- } else if (signalFatalAndThrow) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- throw io;
- } else if (signalFatalAndComplete) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- return;
- } else if (signalFatalAndLoop) {
- IOException io = new IOException("FATALERROR");
- getContext().fatalError(io, "FATALERROR");
- LOG.info("Waiting for Processor signal again");
- processorCondition.await();
- LOG.info("Received second processor signal");
- }
- } catch (InterruptedException e) {
- receivedInterrupt = true;
- }
- } finally {
- completed = true;
- completionCondition.signal();
- processorLock.unlock();
- }
- }
- }
-
- private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
-
- private static final Logger LOG = Logger.getLogger(TezTaskUmbilicalForTest.class);
-
- private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
-
- private final ReentrantLock umbilicalLock = new ReentrantLock();
- private final Condition eventCondition = umbilicalLock.newCondition();
- private boolean pendingEvent = false;
- private boolean eventEnacted = false;
-
- volatile int getTaskInvocations = 0;
-
- private boolean shouldThrowException = false;
- private boolean shouldSendDieSignal = false;
-
- public void signalThrowException() {
- umbilicalLock.lock();
- try {
- shouldThrowException = true;
- pendingEvent = true;
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void signalSendShouldDie() {
- umbilicalLock.lock();
- try {
- shouldSendDieSignal = true;
- pendingEvent = true;
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void awaitRegisteredEvent() throws InterruptedException {
- umbilicalLock.lock();
- try {
- if (eventEnacted) {
- return;
- }
- LOG.info("Awaiting event");
- eventCondition.await();
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void resetTrackedEvents() {
- umbilicalLock.lock();
- try {
- requestEvents.clear();
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyNoCompletionEvents() {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptFailedEvent) {
- fail("Found a TaskAttemptFailedEvent when not expected");
- }
- if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
- fail("Found a TaskAttemptCompletedvent when not expected");
- }
- }
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyTaskFailedEvent(String diagnostics) {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptFailedEvent) {
- TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
- if(failedEvent.getDiagnostics().startsWith(diagnostics)){
- return ;
- } else {
- fail("No detailed diagnostics message in TaskAttemptFailedEvent");
- }
- }
- }
- fail("No TaskAttemptFailedEvents sent over umbilical");
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- public void verifyTaskSuccessEvent() {
- umbilicalLock.lock();
- try {
- for (TezEvent event : requestEvents) {
- if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
- return;
- }
- }
- fail("No TaskAttemptFailedEvents sent over umbilical");
- } finally {
- umbilicalLock.unlock();
- }
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- return 0;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int clientMethodsHash) throws IOException {
- return null;
- }
-
- @Override
- public ContainerTask getTask(ContainerContext containerContext) throws IOException {
- // Return shouldDie = true
- getTaskInvocations++;
- return new ContainerTask(null, true, null, null, false);
- }
-
- @Override
- public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
- return true;
- }
-
- @Override
- public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
- TezException {
- umbilicalLock.lock();
- if (request.getEvents() != null) {
- requestEvents.addAll(request.getEvents());
- }
- try {
- if (shouldThrowException) {
- LOG.info("TestUmbilical throwing Exception");
- throw new IOException(HEARTBEAT_EXCEPTION_STRING);
- }
- TezHeartbeatResponse response = new TezHeartbeatResponse();
- response.setLastRequestId(request.getRequestId());
- if (shouldSendDieSignal) {
- LOG.info("TestUmbilical returning shouldDie=true");
- response.setShouldDie();
- }
- return response;
- } finally {
- if (pendingEvent) {
- eventEnacted = true;
- LOG.info("Signalling Event");
- eventCondition.signal();
- }
- umbilicalLock.unlock();
- }
- }
- }
-
- private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
- TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
- createContainerId(appId).toString());
- return taskReporter;
- }
-
- private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
- TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
- throws IOException {
- return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
- processorConf);
- }
-
- private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
- TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, byte[] processorConf) throws IOException{
- TezConfiguration tezConf = new TezConfiguration(defaultConf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- Path testDir = new Path(workDir, UUID.randomUUID().toString());
- String[] localDirs = new String[] { testDir.toString() };
-
- TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
- TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
- TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
- ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
- .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
- TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
- new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
-
- TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
- new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
- executor, null, "", new ExecutionContextImpl("localhost"));
- return taskRunner;
- }
-
- private ContainerId createContainerId(ApplicationId appId) {
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
- return containerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
deleted file mode 100644
index de03307..0000000
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Lists;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class TestTaskReporter {
-
- @Test(timeout = 10000)
- public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
-
- final Object lock = new Object();
- final AtomicBoolean hb2Done = new AtomicBoolean(false);
-
- TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
- if (request.getRequestId() == 1 || request.getRequestId() == 2) {
- TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
- response.setLastRequestId(request.getRequestId());
- return response;
- } else if (request.getRequestId() == 3) {
- TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
- response.setLastRequestId(request.getRequestId());
- synchronized (lock) {
- hb2Done.set(true);
- lock.notify();
- }
- return response;
- } else {
- throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
- }
- }
- }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
-
- TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
- LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
- doReturn("vertexName").when(mockTask).getVertexName();
- doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
-
- // Setup the sleep time to be way higher than the test timeout
- TaskReporter.HeartbeatCallable heartbeatCallable =
- new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
- new AtomicLong(0),
- "containerIdStr");
-
- ExecutorService executor = Executors.newSingleThreadExecutor();
- executor.submit(heartbeatCallable);
- try {
- synchronized (lock) {
- if (!hb2Done.get()) {
- lock.wait();
- }
- }
- verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
- Thread.sleep(2000l);
- // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
- verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
- } finally {
- executor.shutdownNow();
- }
-
- }
-
- private List<TezEvent> createEvents(int numEvents) {
- List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
- for (int i = 0; i < numEvents; i++) {
- list.add(mock(TezEvent.class));
- }
- return list;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
new file mode 100644
index 0000000..a68c7c1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+
+/**
+ * Responsible for communication between a running Container and the ApplicationMaster. The main
+ * functionality is to poll for new tasks.
+ *
+ */
+public class ContainerReporter implements Callable<ContainerTask> {
+
+ private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
+
+ private final TezTaskUmbilicalProtocol umbilical;
+ private final ContainerContext containerContext;
+ private final int getTaskMaxSleepTime;
+ private final long LOG_INTERVAL = 2000l;
+
+ private long nextGetTaskPrintTime;
+
+ ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
+ int getTaskMaxSleepTime) {
+ this.umbilical = umbilical;
+ this.containerContext = containerContext;
+ this.getTaskMaxSleepTime = getTaskMaxSleepTime;
+ }
+
+ @Override
+ public ContainerTask call() throws Exception {
+ ContainerTask containerTask = null;
+ LOG.info("Attempting to fetch new task");
+ containerTask = umbilical.getTask(containerContext);
+ long getTaskPollStartTime = System.currentTimeMillis();
+ nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
+ for (int idle = 1; containerTask == null; idle++) {
+ long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
+ maybeLogSleepMessage(sleepTimeMilliSecs);
+ TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
+ containerTask = umbilical.getTask(containerContext);
+ }
+ LOG.info("Got TaskUpdate: "
+ + (System.currentTimeMillis() - getTaskPollStartTime)
+ + " ms after starting to poll."
+ + " TaskInfo: shouldDie: "
+ + containerTask.shouldDie()
+ + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
+ + containerTask.getTaskSpec().getTaskAttemptID()));
+ return containerTask;
+ }
+
+ private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
+ long currentTime = System.currentTimeMillis();
+ if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
+ LOG.info("Sleeping for " + sleepTimeMilliSecs
+ + "ms before retrying getTask again. Got null now. "
+ + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
+ nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
new file mode 100644
index 0000000..1146ce4
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public interface ErrorReporter {
+
+ void reportError(Throwable t);
+
+ void shutdownRequested();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
new file mode 100644
index 0000000..15dcbb0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Responsible for communication between tasks running in a Container and the ApplicationMaster.
+ * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
+ * retrieve events specific to this task.
+ *
+ */
+public class TaskReporter {
+
+ private static final Logger LOG = Logger.getLogger(TaskReporter.class);
+
+ private final TezTaskUmbilicalProtocol umbilical;
+ private final long pollInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+ private final AtomicLong requestCounter;
+ private final String containerIdStr;
+
+ private final ListeningExecutorService heartbeatExecutor;
+
+ @VisibleForTesting
+ HeartbeatCallable currentCallable;
+
+ public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
+ long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+ this.umbilical = umbilical;
+ this.pollInterval = amPollInterval;
+ this.sendCounterInterval = sendCounterInterval;
+ this.maxEventsToGet = maxEventsToGet;
+ this.requestCounter = requestCounter;
+ this.containerIdStr = containerIdStr;
+ ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
+ heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+ }
+
+ /**
+ * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
+ */
+ public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
+ ErrorReporter errorReporter) {
+ currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
+ maxEventsToGet, requestCounter, containerIdStr);
+ ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
+ Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+ }
+
+ /**
+ * This method should always be invoked before setting up heartbeats for another task running in
+ * the same container.
+ */
+ public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+ currentCallable.markComplete();
+ currentCallable = null;
+ }
+
+ public void shutdown() {
+ heartbeatExecutor.shutdownNow();
+ }
+
+ @VisibleForTesting
+ static class HeartbeatCallable implements Callable<Boolean> {
+
+ private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
+ private static final float LOG_COUNTER_BACKOFF = 1.3f;
+
+ private final LogicalIOProcessorRuntimeTask task;
+ private EventMetaData updateEventMetadata;
+
+ private final TezTaskUmbilicalProtocol umbilical;
+
+ private final long pollInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+ private final String containerIdStr;
+
+ private final AtomicLong requestCounter;
+
+ private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ /*
+ * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
+ * log counters.
+ */
+ private int nonOobHeartbeatCounter = 0;
+ private int nextHeartbeatNumToLog = 0;
+ /*
+ * Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
+ */
+ private int prevCounterSendHeartbeatNum = 0;
+
+ public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
+ TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
+ int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+
+ this.pollInterval = amPollInterval;
+ this.sendCounterInterval = sendCounterInterval;
+ this.maxEventsToGet = maxEventsToGet;
+ this.requestCounter = requestCounter;
+ this.containerIdStr = containerIdStr;
+
+ this.task = task;
+ this.umbilical = umbilical;
+ this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
+ task.getVertexName(), "", task.getTaskAttemptID());
+
+ nextHeartbeatNumToLog = (Math.max(1,
+ (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+ : (float) amPollInterval))));
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ // Heartbeat only for active tasks. Errors, etc will be reported directly.
+ while (!task.isTaskDone() && !task.hadFatalError()) {
+ ResponseWrapper response = heartbeat(null);
+
+ if (response.shouldDie) {
+ // AM sent a shouldDie=true
+ LOG.info("Asked to die via task heartbeat");
+ return false;
+ } else {
+ if (response.numEvents < maxEventsToGet) {
+ // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
+ lock.lock();
+ try {
+ boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+ if (!interrupted) {
+ nonOobHeartbeatCounter++;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+ int pendingEventCount = eventsToSend.size();
+ if (pendingEventCount > 0) {
+ LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
+ }
+ return true;
+ }
+
+ /**
+ * @param eventsArg
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+ TezException {
+
+ if (eventsArg != null) {
+ eventsToSend.addAll(eventsArg);
+ }
+
+ TezEvent updateEvent = null;
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ eventsToSend.drainTo(events);
+
+ if (!task.isTaskDone() && !task.hadFatalError()) {
+ TezCounters counters = null;
+ /**
+ * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
+ * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
+ * real time decisions are made based on these counters, it can be sent once per second.
+ */
+ // Not completely accurate, since OOB heartbeats could go out.
+ if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+ counters = task.getCounters();
+ prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+ }
+ updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
+ updateEventMetadata);
+ events.add(updateEvent);
+ }
+
+ long requestId = requestCounter.incrementAndGet();
+ TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
+ task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending heartbeat to AM, request=" + request);
+ }
+
+ maybeLogCounters();
+
+ TezHeartbeatResponse response = umbilical.heartbeat(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat response from AM, response=" + response);
+ }
+
+ if (response.shouldDie()) {
+ LOG.info("Received should die response from AM");
+ return new ResponseWrapper(true, 1);
+ }
+ if (response.getLastRequestId() != requestId) {
+ throw new TezException("AM and Task out of sync" + ", responseReqId="
+ + response.getLastRequestId() + ", expectedReqId=" + requestId);
+ }
+
+ // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
+ // are running using the same umbilical.
+ int numEventsReceived = 0;
+ if (task.isTaskDone() || task.hadFatalError()) {
+ if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+ LOG.warn("Current task already complete, Ignoring all event in"
+ + " heartbeat response, eventCount=" + response.getEvents().size());
+ }
+ } else {
+ if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
+ }
+ // This should ideally happen in a separate thread
+ numEventsReceived = response.getEvents().size();
+ task.handleEvents(response.getEvents());
+ }
+ }
+ return new ResponseWrapper(false, numEventsReceived);
+ }
+
+ public void markComplete() {
+ // Notify to clear pending events, if any.
+ lock.lock();
+ try {
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void maybeLogCounters() {
+ if (LOG.isDebugEnabled()) {
+ if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+ LOG.debug("Counters: " + task.getCounters().toShortString());
+ nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+ }
+ }
+ }
+
+ /**
+ * Sends out final events for task success.
+ * @param taskAttemptID
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+ TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+ task.getProgress()), updateEventMetadata);
+ TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+ updateEventMetadata);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+ }
+
+ /**
+ * Sends out final events for task failure.
+ * @param taskAttemptID
+ * @param t
+ * @param diagnostics
+ * @param srcMeta
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+ EventMetaData srcMeta) throws IOException, TezException {
+ TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+ task.getProgress()), updateEventMetadata);
+ if (diagnostics == null) {
+ diagnostics = ExceptionUtils.getStackTrace(t);
+ } else {
+ diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+ }
+ TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ srcMeta == null ? updateEventMetadata : srcMeta);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+ }
+
+ private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+ if (events != null && !events.isEmpty()) {
+ eventsToSend.addAll(events);
+ }
+ }
+ }
+
+ private static class HeartbeatCallback implements FutureCallback<Boolean> {
+
+ private final ErrorReporter errorReporter;
+
+ HeartbeatCallback(ErrorReporter errorReporter) {
+ this.errorReporter = errorReporter;
+ }
+
+ @Override
+ public void onSuccess(Boolean result) {
+ if (result == false) {
+ errorReporter.shutdownRequested();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ errorReporter.reportError(t);
+ }
+ }
+
+ public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+ return currentCallable.taskSucceeded(taskAttemptID);
+ }
+
+ public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+ EventMetaData srcMeta) throws IOException, TezException {
+ return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+ }
+
+ public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+ currentCallable.addEvents(taskAttemptID, events);
+ }
+
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+ return umbilical.canCommit(taskAttemptID);
+ }
+
+ private static final class ResponseWrapper {
+ boolean shouldDie;
+ int numEvents;
+
+ private ResponseWrapper(boolean shouldDie, int numEvents) {
+ this.shouldDie = shouldDie;
+ this.numEvents = numEvents;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
new file mode 100644
index 0000000..c0843a0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TezChild {
+
+ private static final Logger LOG = Logger.getLogger(TezChild.class);
+
+ private final Configuration defaultConf;
+ private final String containerIdString;
+ private final int appAttemptNumber;
+ private final String[] localDirs;
+
+ private final AtomicLong heartbeatCounter = new AtomicLong(0);
+
+ private final int getTaskMaxSleepTime;
+ private final int amHeartbeatInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+ private final boolean isLocal;
+ private final String workingDir;
+
+ private final ListeningExecutorService executor;
+ private final ObjectRegistryImpl objectRegistry;
+ private final String pid;
+ private final ExecutionContext ExecutionContext;
+ private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ private final Map<String, String> serviceProviderEnvMap;
+
+ private Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+ private TaskReporter taskReporter;
+ private TezTaskUmbilicalProtocol umbilical;
+ private int taskCount = 0;
+ private TezVertexID lastVertexID;
+
+ public TezChild(Configuration conf, String host, int port, String containerIdentifier,
+ String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
+ Map<String, String> serviceProviderEnvMap,
+ ObjectRegistryImpl objectRegistry, String pid,
+ ExecutionContext ExecutionContext)
+ throws IOException, InterruptedException {
+ this.defaultConf = conf;
+ this.containerIdString = containerIdentifier;
+ this.appAttemptNumber = appAttemptNumber;
+ this.localDirs = localDirs;
+ this.serviceProviderEnvMap = serviceProviderEnvMap;
+ this.workingDir = workingDir;
+ this.pid = pid;
+ this.ExecutionContext = ExecutionContext;
+
+ getTaskMaxSleepTime = defaultConf.getInt(
+ TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
+ TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
+
+ amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+ sendCounterInterval = defaultConf.getLong(
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
+
+ maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("TezChild").build());
+ this.executor = MoreExecutors.listeningDecorator(executor);
+
+ this.objectRegistry = objectRegistry;
+
+ // Security framework already loaded the tokens into current ugi
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing with tokens:");
+ for (Token<?> token : credentials.getAllTokens()) {
+ LOG.debug(token);
+ }
+ }
+
+ this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+ TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+ serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ TezCommonUtils.convertJobTokenToBytes(jobToken));
+
+ if (!isLocal) {
+ final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
+ SecurityUtil.setTokenService(jobToken, address);
+ taskOwner.addToken(jobToken);
+ umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+ @Override
+ public TezTaskUmbilicalProtocol run() throws Exception {
+ return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+ TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+ }
+ });
+ }
+ }
+
+ public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
+
+ ContainerContext containerContext = new ContainerContext(containerIdString);
+ ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
+ getTaskMaxSleepTime);
+
+ taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
+ sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
+
+ UserGroupInformation childUGI = null;
+
+ while (!executor.isTerminated()) {
+ if (taskCount > 0) {
+ TezUtilsInternal.updateLoggers("");
+ }
+ ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+ ContainerTask containerTask = null;
+ try {
+ containerTask = getTaskFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ handleError(cause);
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ cause, "Execution Exception while fetching new work: " + e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for new work:"
+ + containerTask.getTaskSpec().getTaskAttemptID());
+ handleError(e);
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
+ "Interrupted while waiting for new work");
+ }
+ if (containerTask.shouldDie()) {
+ LOG.info("ContainerTask returned shouldDie=true, Exiting");
+ shutdown();
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+ "Asked to die by the AM");
+ } else {
+ String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
+ taskCount++;
+ TezUtilsInternal.updateLoggers(loggerAddend);
+ FileSystem.clearStatistics();
+
+ childUGI = handleNewTaskCredentials(containerTask, childUGI);
+ handleNewTaskLocalResources(containerTask);
+ cleanupOnTaskChanged(containerTask);
+
+ // Execute the Actual Task
+ TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
+ localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
+ serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
+ executor, objectRegistry, pid, this.ExecutionContext);
+ boolean shouldDie;
+ try {
+ shouldDie = !taskRunner.run();
+ if (shouldDie) {
+ LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+ shutdown();
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+ "Asked to die by the AM");
+ }
+ } catch (IOException e) {
+ handleError(e);
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ e, "TaskExecutionFailure: " + e.getMessage());
+ } catch (TezException e) {
+ handleError(e);
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ e, "TaskExecutionFailure: " + e.getMessage());
+ } finally {
+ FileSystem.closeAllForUGI(childUGI);
+ }
+ }
+ }
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+ null);
+ }
+
+ /**
+ * Setup
+ *
+ * @param containerTask
+ * the new task specification. Must be a valid task
+ * @param childUGI
+ * the old UGI instance being used
+ * @return childUGI
+ */
+ UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
+ UserGroupInformation childUGI) {
+ // Re-use the UGI only if the Credentials have not changed.
+ Preconditions.checkState(!containerTask.shouldDie());
+ Preconditions.checkState(containerTask.getTaskSpec() != null);
+ if (containerTask.haveCredentialsChanged()) {
+ LOG.info("Refreshing UGI since Credentials have changed");
+ Credentials taskCreds = containerTask.getCredentials();
+ if (taskCreds != null) {
+ LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+ + taskCreds.numberOfSecretKeys());
+ childUGI = UserGroupInformation.createRemoteUser(System
+ .getenv(ApplicationConstants.Environment.USER.toString()));
+ childUGI.addCredentials(containerTask.getCredentials());
+ } else {
+ LOG.info("Not loading any credentials, since no credentials provided");
+ }
+ }
+ return childUGI;
+ }
+
+ /**
+ * Handles any additional resources to be localized for the new task
+ *
+ * @param containerTask
+ * @throws IOException
+ * @throws TezException
+ */
+ private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
+ TezException {
+ Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Additional Resources added to container: " + additionalResources);
+ }
+
+ LOG.info("Localizing additional local resources for Task : " + additionalResources);
+ List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+ Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+ @Override
+ public URI apply(TezLocalResource input) {
+ return input.getUri();
+ }
+ }), defaultConf, workingDir);
+ RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+ LOG.info("Done localizing additional resources");
+ final TaskSpec taskSpec = containerTask.getTaskSpec();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New container task context:" + taskSpec.toString());
+ }
+ }
+
+ /**
+ * Cleans entries from the object registry, and resets the startedInputsMap if required
+ *
+ * @param containerTask
+ * the new task specification. Must be a valid task
+ */
+ private void cleanupOnTaskChanged(ContainerTask containerTask) {
+ Preconditions.checkState(!containerTask.shouldDie());
+ Preconditions.checkState(containerTask.getTaskSpec() != null);
+ TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
+ .getVertexID();
+ if (lastVertexID != null) {
+ if (!lastVertexID.equals(newVertexID)) {
+ objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX);
+ }
+ if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
+ objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.DAG);
+ startedInputsMap = HashMultimap.create();
+ }
+ }
+ lastVertexID = newVertexID;
+ }
+
+ private void shutdown() {
+ executor.shutdownNow();
+ if (taskReporter != null) {
+ taskReporter.shutdown();
+ }
+ RPC.stopProxy(umbilical);
+ DefaultMetricsSystem.shutdown();
+ if (!isLocal) {
+ LogManager.shutdown();
+ }
+ }
+
+ public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
+ if(tezTaskUmbilicalProtocol != null){
+ this.umbilical = tezTaskUmbilicalProtocol;
+ }
+ }
+
+ public static class ContainerExecutionResult {
+ public static enum ExitStatus {
+ SUCCESS(0),
+ EXECUTION_FAILURE(1),
+ INTERRUPTED(2),
+ ASKED_TO_DIE(3);
+
+ private final int exitCode;
+
+ ExitStatus(int code) {
+ this.exitCode = code;
+ }
+
+ public int getExitCode() {
+ return this.exitCode;
+ }
+ }
+
+ private final ExitStatus exitStatus;
+ private final Throwable throwable;
+ private final String errorMessage;
+
+ ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
+ @Nullable String errorMessage) {
+ this.exitStatus = exitStatus;
+ this.throwable = throwable;
+ this.errorMessage = errorMessage;
+ }
+
+ public ExitStatus getExitStatus() {
+ return this.exitStatus;
+ }
+
+ public Throwable getThrowable() {
+ return this.throwable;
+ }
+
+ public String getErrorMessage() {
+ return this.errorMessage;
+ }
+ }
+
+ public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
+ String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
+ Map<String, String> serviceProviderEnvMap, @Nullable String pid,
+ ExecutionContext ExecutionContext)
+ throws IOException, InterruptedException, TezException {
+
+ // Pull in configuration specified for the session.
+ // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
+ // for each and every task, and reading it back from disk. Also needs to be per vertex.
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
+ UserGroupInformation.setConfiguration(conf);
+ Limits.setConfiguration(conf);
+
+ // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
+ // of this class. Leaving it here, till there's some entity representing a running JVM.
+ DefaultMetricsSystem.initialize("TezTask");
+
+ // singleton of ObjectRegistry for this JVM
+ ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+
+ return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
+ attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
+ ExecutionContext);
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException, TezException {
+
+ final Configuration defaultConf = new Configuration();
+
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ LOG.info("TezChild starting");
+
+ assert args.length == 5;
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ final String containerIdentifier = args[2];
+ final String tokenIdentifier = args[3];
+ final int attemptNumber = Integer.parseInt(args[4]);
+ final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
+ .name()));
+ final String pid = System.getenv().get("JVM_PID");
+ LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+ + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+ + " tokenIdentifier: " + tokenIdentifier);
+ }
+ TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
+ tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
+ System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
+ tezChild.run();
+ }
+
+ private void handleError(Throwable t) {
+ shutdown();
+ }
+
+}