You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/29 19:51:11 UTC
[2/2] git commit: TEZ-1116. Refactor YarnTezDAGChild to be testable
and usable for LocalMode. (sseth)
TEZ-1116. Refactor YarnTezDAGChild to be testable and usable for
LocalMode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/acd0a46e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/acd0a46e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/acd0a46e
Branch: refs/heads/master
Commit: acd0a46e36d01ec5d98b9ca263741c4238f0f9aa
Parents: 80b91a4
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 29 10:50:35 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 29 10:50:35 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/TezConfiguration.java | 2 +-
.../apache/hadoop/mapred/YarnTezDagChild.java | 735 -------------------
.../tez/dag/utils/TezRuntimeChildJVM.java | 4 +-
.../tez/runtime/task/ContainerReporter.java | 84 +++
.../apache/tez/runtime/task/ErrorReporter.java | 8 +
.../apache/tez/runtime/task/TaskReporter.java | 383 ++++++++++
.../org/apache/tez/runtime/task/TezChild.java | 366 +++++++++
.../apache/tez/runtime/task/TezTaskRunner.java | 378 ++++++++++
.../tez/runtime/task/TestTaskExecution.java | 661 +++++++++++++++++
.../org/apache/tez/mapreduce/TestUmbilical.java | 6 +-
.../org/apache/tez/common/ContainerContext.java | 43 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 3 +-
.../org/apache/tez/runtime/RuntimeTask.java | 4 +
.../runtime/api/impl/TezTaskContextImpl.java | 18 +-
.../tez/runtime/api/impl/TezUmbilical.java | 3 +-
15 files changed, 1913 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 36fdd59..0b7f52a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -180,7 +180,7 @@ public class TezConfiguration extends Configuration {
*/
public static final String TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS = TEZ_TASK_PREFIX
+ "am.heartbeat.counter.interval-ms.max";
- public static final long TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
+ public static final int TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
1000;
public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
deleted file mode 100644
index 7c02077..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapred;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezLocalResource;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.Limits;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.RelocalizationUtils;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-
-/**
- * The main() for TEZ Task processes.
- */
-public class YarnTezDagChild {
-
- private static final Logger LOG = Logger.getLogger(YarnTezDagChild.class);
-
- private static AtomicBoolean stopped = new AtomicBoolean(false);
-
- private static String containerIdStr;
- private static int maxEventsToGet = 0;
- private static LinkedBlockingQueue<TezEvent> eventsToSend =
- new LinkedBlockingQueue<TezEvent>();
- private static AtomicLong requestCounter = new AtomicLong(0);
- private static long amPollInterval;
- private static long hbCounterInterval;
- private static TezTaskUmbilicalProtocol umbilical;
- private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
- private static LogicalIOProcessorRuntimeTask currentTask = null;
- private static TezTaskAttemptID currentTaskAttemptID;
- private static AtomicBoolean heartbeatError = new AtomicBoolean(false);
- private static Throwable heartbeatErrorException = null;
- // Implies that the task is done - and the AM is being informed.
- private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
- /**
- * Used to maintain information about which Inputs have been started by the
- * framework for the specific DAG. Makes an assumption that multiple DAGs do
- * not execute concurrently, and must be reset each time the running DAG
- * changes.
- */
- private static Multimap<String, String> startedInputsMap = HashMultimap.create();
-
- private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds.
- private static final float LOG_COUNTER_BACKOFF = 1.3f;
- private static int taskNonOobHeartbeatCounter = 0;
- private static int nextHeartbeatNumToLog = 0;
- private static long prevHeartbeatTimeStamp = System.currentTimeMillis();
-
- private static Thread startHeartbeatThread() {
- Thread heartbeatThread = new Thread(new Runnable() {
- public void run() {
- while (!(stopped.get() || heartbeatError.get())) {
- try {
- try {
- if(!heartbeat()) {
- // AM asked us to die
- break;
- }
- } catch (InvalidToken e) {
- // FIXME NEWTEZ maybe send a container failed event to AM?
- // Irrecoverable error unless heartbeat sync can be re-established
- LOG.error("Heartbeat error in authenticating with AM: ", e);
- heartbeatErrorException = e;
- heartbeatError.set(true);
- break;
- } catch (Throwable e) {
- // FIXME NEWTEZ maybe send a container failed event to AM?
- // Irrecoverable error unless heartbeat sync can be re-established
- LOG.error("Heartbeat error in communicating with AM. ", e);
- if (e instanceof Error) {
- LOG.error("Exception of type Error. Exiting now", e);
- ExitUtil.terminate(-1, e);
- }
- heartbeatErrorException = e;
- heartbeatError.set(true);
- break;
- }
- Thread.sleep(amPollInterval);
- } catch (InterruptedException e) {
- // we were interrupted so that we will stop.
- LOG.info("Heartbeat thread interrupted. " +
- " stopped: " + stopped.get() + " error: " + heartbeatError.get());
- continue;
- }
- }
-
- if (currentTaskComplete.get() || stopped.get()) {
- // Don't exit. The Tez framework has control, let the container finish after cleanup etc.
- // Makes an assumption that a heartbeat shouldDie will be reported as a getTask should die.
- LOG.info("Current task marked as complete. Stopping heartbeat thread and allowing normal container shutdown");
- return;
- } else {
- // Assuming the task is still running, and we've been asked to die or an error occurred.
- // Stop the process.
- if (heartbeatErrorException != null) {
- ExitUtil.terminate(-1, heartbeatErrorException);
- } else {
- ExitUtil.terminate(-1, "Exiting Tez Child Process");
- }
- }
- }
- });
- heartbeatThread.setName("Tez Container Heartbeat Thread ["
- + containerIdStr + "]");
- heartbeatThread.setDaemon(true);
- heartbeatThread.start();
- return heartbeatThread;
- }
-
- private static synchronized boolean heartbeat() throws TezException, IOException {
- return heartbeat(null);
- }
-
- private static synchronized boolean heartbeat(
- Collection<TezEvent> outOfBandEvents)
- throws TezException, IOException {
- TezEvent updateEvent = null;
- int eventCounter = 0;
- int eventsRange = 0;
- TezTaskAttemptID taskAttemptID = null;
- List<TezEvent> events = new ArrayList<TezEvent>();
- try {
- taskLock.readLock().lock();
- if (currentTask != null) {
- eventsToSend.drainTo(events);
- taskAttemptID = currentTaskAttemptID;
- eventCounter = currentTask.getEventCounter();
- eventsRange = maxEventsToGet;
- if (!currentTask.isTaskDone() && !currentTask.hadFatalError()) {
- TezCounters counters = null;
- /**
- * Increasing the heartbeat interval can delay the delivery of events.
- * Sending just updated records would save CPU in DAG AM, but certain
- * counters are updated very frequently. Until real time decisions are made
- * based on these counters, it can be sent once per second.
- */
- if ((System.currentTimeMillis() - prevHeartbeatTimeStamp) > hbCounterInterval) {
- counters = currentTask.getCounters();
- prevHeartbeatTimeStamp = System.currentTimeMillis();
- }
- updateEvent = new TezEvent(new TaskStatusUpdateEvent(
- counters, currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- currentTask.getVertexName(), "", taskAttemptID));
- events.add(updateEvent);
- } else if (outOfBandEvents == null && events.isEmpty()) {
- LOG.info("Setting TaskAttemptID to null as the task has already"
- + " completed. Caused by race-condition between the normal"
- + " heartbeat and out-of-band heartbeats");
- taskAttemptID = null;
- } else {
- if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
- events.addAll(outOfBandEvents);
- }
- }
- }
- } finally {
- taskLock.readLock().unlock();
- }
-
- if (LOG.isDebugEnabled()) {
- taskNonOobHeartbeatCounter++;
- if (taskNonOobHeartbeatCounter == nextHeartbeatNumToLog) {
- taskLock.readLock().lock();
- try {
- if (currentTask != null) {
- LOG.debug("Counters: " + currentTask.getCounters().toShortString());
- taskNonOobHeartbeatCounter = 0;
- nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
- }
- } finally {
- taskLock.readLock().unlock();
- }
- }
- }
-
- long reqId = requestCounter.incrementAndGet();
- TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
- containerIdStr, taskAttemptID, eventCounter, eventsRange);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending heartbeat to AM"
- + ", request=" + request.toString());
- }
- TezHeartbeatResponse response = umbilical.heartbeat(request);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat response from AM"
- + ", response=" + response);
- }
- if(response.shouldDie()) {
- LOG.info("Received should die response from AM");
- return false;
- }
- if (response.getLastRequestId() != reqId) {
- throw new TezException("AM and Task out of sync"
- + ", responseReqId=" + response.getLastRequestId()
- + ", expectedReqId=" + reqId);
- }
- try {
- taskLock.readLock().lock();
- if (taskAttemptID == null
- || !taskAttemptID.equals(currentTaskAttemptID)) {
- if (response.getEvents() != null
- && !response.getEvents().isEmpty()) {
- LOG.warn("No current assigned task, ignoring all events in"
- + " heartbeat response, eventCount="
- + response.getEvents().size());
- }
- return true;
- }
- if (currentTask != null && response.getEvents() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Routing events from heartbeat response to task"
- + ", currentTaskAttemptId=" + currentTaskAttemptID
- + ", eventCount=" + response.getEvents().size());
- }
- currentTask.handleEvents(response.getEvents());
- }
- } finally {
- taskLock.readLock().unlock();
- }
- return true;
- }
-
- public static void main(String[] args) throws Throwable {
- Thread.setDefaultUncaughtExceptionHandler(
- new YarnUncaughtExceptionHandler());
- LOG.info("YarnTezDagChild starting");
-
- final Configuration defaultConf = new Configuration();
- TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
- // Security settings will be loaded based on core-site and core-default.
- // Don't depend on the jobConf for this.
- UserGroupInformation.setConfiguration(defaultConf);
- Limits.setConfiguration(defaultConf);
-
- assert args.length == 5;
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- final InetSocketAddress address =
- NetUtils.createSocketAddrForHost(host, port);
- final String containerIdentifier = args[2];
- final String tokenIdentifier = args[3];
- final int attemptNumber = Integer.parseInt(args[4]);
- if (LOG.isDebugEnabled()) {
- LOG.info("Info from cmd line: AM-host: " + host + " AM-port: " + port
- + " containerIdentifier: " + containerIdentifier + " attemptNumber: "
- + attemptNumber + " tokenIdentifier: " + tokenIdentifier);
- }
- // FIXME fix initialize metrics in child runner
- DefaultMetricsSystem.initialize("VertexTask");
- YarnTezDagChild.containerIdStr = containerIdentifier;
-
- ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
- @SuppressWarnings("unused")
- Injector injector = Guice.createInjector(
- new ObjectRegistryModule(objectRegistry));
-
- // Security framework already loaded the tokens into current ugi
- Credentials credentials =
- UserGroupInformation.getCurrentUser().getCredentials();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Executing with tokens:");
- for (Token<?> token : credentials.getAllTokens()) {
- LOG.debug(token);
- }
- }
-
- amPollInterval = defaultConf.getLong(
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
- hbCounterInterval = defaultConf.getLong(
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
- maxEventsToGet = defaultConf.getInt(
- TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
- TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
-
- // Create TaskUmbilicalProtocol as actual task owner.
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(tokenIdentifier);
-
- Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
- SecurityUtil.setTokenService(jobToken, address);
- taskOwner.addToken(jobToken);
- // Will jobToken change across DAGs ?
- Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
- serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
- ShuffleUtils.convertJobTokenToBytes(jobToken));
-
- umbilical =
- taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
- @Override
- public TezTaskUmbilicalProtocol run() throws Exception {
- return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class,
- TezTaskUmbilicalProtocol.versionID, address, defaultConf);
- }
- });
-
- final Thread heartbeatThread = startHeartbeatThread();
-
- TezUmbilical tezUmbilical = new TezUmbilical() {
- @Override
- public void addEvents(Collection<TezEvent> events) {
- eventsToSend.addAll(events);
- }
-
- @Override
- public void signalFatalError(TezTaskAttemptID taskAttemptID,
- String diagnostics,
- EventMetaData sourceInfo) {
- currentTask.setFrameworkCounters();
- TezEvent statusUpdateEvent =
- new TezEvent(new TaskStatusUpdateEvent(
- currentTask.getCounters(), currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- currentTask.getVertexName(), "",
- currentTask.getTaskAttemptID()));
- TezEvent taskAttemptFailedEvent =
- new TezEvent(new TaskAttemptFailedEvent(diagnostics),
- sourceInfo);
- try {
- // Not setting taskComplete - since the main loop responsible for cleanup doesn't have
- // control yet. Getting control depends on whether the I/P/O returns correctly after
- // reporting an error.
- heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
- } catch (Throwable t) {
- LOG.fatal("Failed to communicate task attempt failure to AM via"
- + " umbilical", t);
- if (t instanceof Error) {
- LOG.error("Exception of type Error. Exiting now", t);
- ExitUtil.terminate(-1, t);
- }
- // FIXME NEWTEZ maybe send a container failed event to AM?
- // Irrecoverable error unless heartbeat sync can be re-established
- heartbeatErrorException = t;
- heartbeatError.set(true);
- heartbeatThread.interrupt();
- }
- }
-
- @Override
- public boolean canCommit(TezTaskAttemptID taskAttemptID)
- throws IOException {
- return umbilical.canCommit(taskAttemptID);
- }
- };
-
- // report non-pid to application master
- String pid = System.getenv().get("JVM_PID");
-
- LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
-
- ContainerTask containerTask = null;
- UserGroupInformation childUGI = null;
- ContainerContext containerContext = new ContainerContext(
- containerIdentifier, pid);
- int getTaskMaxSleepTime = defaultConf.getInt(
- TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
- TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
- int taskCount = 0;
- TezVertexID lastVertexId = null;
- EventMetaData currentSourceInfo = null;
- try {
- String loggerAddend = "";
- while (true) {
- // poll for new task
- if (taskCount > 0) {
- TezUtils.updateLoggers(loggerAddend);
- }
- boolean isNewGetTask = true;
- long getTaskPollStartTime = System.currentTimeMillis();
- long nextGetTaskPrintTime = getTaskPollStartTime + 2000l;
- for (int idle = 0; null == containerTask; ++idle) {
- if (!isNewGetTask) { // Don't sleep on the first iteration.
- long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
- if (sleepTimeMilliSecs + System.currentTimeMillis() > nextGetTaskPrintTime) {
- LOG.info("Sleeping for "
- + sleepTimeMilliSecs
- + "ms before retrying getTask again. Got null now. "
- + "Next getTask sleep message after 2s");
- nextGetTaskPrintTime = System.currentTimeMillis() + sleepTimeMilliSecs + 2000l;
- }
- MILLISECONDS.sleep(sleepTimeMilliSecs);
- } else {
- LOG.info("Attempting to fetch new task");
- }
- isNewGetTask = false;
- containerTask = umbilical.getTask(containerContext);
- }
- LOG.info("Got TaskUpdate: "
- + (System.currentTimeMillis() - getTaskPollStartTime)
- + " ms after starting to poll."
- + " TaskInfo: shouldDie: " + containerTask.shouldDie()
- + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
- + containerTask.getTaskSpec().getTaskAttemptID()));
- if (containerTask.shouldDie()) {
- return;
- }
- taskCount++;
-
- // Reset FileSystem statistics
- FileSystem.clearStatistics();
-
- // Re-use the UGI only if the Credentials have not changed.
- if (containerTask.haveCredentialsChanged()) {
- LOG.info("Refreshing UGI since Credentials have changed");
- Credentials taskCreds = containerTask.getCredentials();
- if (taskCreds != null) {
- LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
- + taskCreds.numberOfSecretKeys());
- childUGI = UserGroupInformation.createRemoteUser(System
- .getenv(ApplicationConstants.Environment.USER.toString()));
- childUGI.addCredentials(containerTask.getCredentials());
- } else {
- LOG.info("Not loading any credentials, since no credentials provided");
- }
- }
-
- Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Additional Resources added to container: " + additionalResources);
- }
-
- LOG.info("Localizing additional local resources for Task : " + additionalResources);
- List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
- Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
- @Override
- public URI apply(TezLocalResource input) {
- return input.getUri();
- }
- }), defaultConf);
- RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
-
- LOG.info("Done localizing additional resources");
- final TaskSpec taskSpec = containerTask.getTaskSpec();
- if (LOG.isDebugEnabled()) {
- LOG.debug("New container task context:"
- + taskSpec.toString());
- }
-
- try {
- taskLock.writeLock().lock();
- currentTaskAttemptID = taskSpec.getTaskAttemptID();
- TezVertexID newVertexId =
- currentTaskAttemptID.getTaskID().getVertexID();
- currentTaskComplete.set(false);
-
- if (lastVertexId != null) {
- if (!lastVertexId.equals(newVertexId)) {
- objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
- }
- if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
- objectRegistry.clearCache(ObjectLifeCycle.DAG);
- startedInputsMap = HashMultimap.create();
- }
- }
- lastVertexId = newVertexId;
- TezUtils.updateLoggers(currentTaskAttemptID.toString());
- loggerAddend = currentTaskAttemptID.toString() + "_post";
-
- currentTask = createLogicalTask(attemptNumber, taskSpec,
- defaultConf, tezUmbilical, serviceConsumerMetadata);
-
- taskNonOobHeartbeatCounter = 0;
- nextHeartbeatNumToLog = (Math.max(1,
- (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
- : (float) amPollInterval))));
- } finally {
- taskLock.writeLock().unlock();
- }
-
- final EventMetaData sourceInfo = new EventMetaData(
- EventProducerConsumerType.SYSTEM,
- taskSpec.getVertexName(), "", currentTaskAttemptID);
- currentSourceInfo = sourceInfo;
-
- // TODO Initiate Java VM metrics
- // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
-
- childUGI.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- try {
- setFileSystemWorkingDir(defaultConf);
- LOG.info("Initializing task"
- + ", taskAttemptId=" + currentTaskAttemptID);
- currentTask.initialize();
- if (!currentTask.hadFatalError()) {
- LOG.info("Running task"
- + ", taskAttemptId=" + currentTaskAttemptID);
- currentTask.run();
- LOG.info("Closing task"
- + ", taskAttemptId=" + currentTaskAttemptID);
- currentTask.close();
- }
- LOG.info("Task completed"
- + ", taskAttemptId=" + currentTaskAttemptID
- + ", fatalErrorOccurred=" + currentTask.hadFatalError());
- // Mark taskComplete - irrespective of failure, framework has control from this point.
- currentTaskComplete.set(true);
- // TODONEWTEZ Should the container continue to run if the running task reported a fatal error ?
- if (!currentTask.hadFatalError()) {
- // Set counters in case of a successful task.
- currentTask.setFrameworkCounters();
- TezEvent statusUpdateEvent =
- new TezEvent(new TaskStatusUpdateEvent(
- currentTask.getCounters(), currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- currentTask.getVertexName(), "",
- currentTask.getTaskAttemptID()));
- TezEvent taskCompletedEvent =
- new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
- heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
- } // Should the fatalError be reported ?
- } finally {
- currentTask.cleanup();
- }
- try {
- taskLock.writeLock().lock();
- currentTask = null;
- currentTaskAttemptID = null;
- } finally {
- taskLock.writeLock().unlock();
- }
- return null;
- }
- });
- FileSystem.closeAllForUGI(childUGI);
- containerTask = null;
- if (heartbeatError.get()) {
- LOG.fatal("Breaking out of task loop, heartbeat error occurred",
- heartbeatErrorException);
- break;
- }
- }
- } catch (FSError e) {
- // Heartbeats controlled manually after this.
- stopped.set(true);
- heartbeatThread.interrupt();
- LOG.fatal("FSError from child", e);
- // TODO NEWTEZ this should be a container failed event?
- try {
- taskLock.readLock().lock();
- if (currentTask != null && !currentTask.hadFatalError()) {
- // TODO Is this of any use if the heartbeat thread is being interrupted first ?
- // Prevent dup failure events
- currentTask.setFrameworkCounters();
- TezEvent statusUpdateEvent =
- new TezEvent(new TaskStatusUpdateEvent(
- currentTask.getCounters(), currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- currentTask.getVertexName(), "",
- currentTask.getTaskAttemptID()));
- currentTask.setFatalError(e, "FS Error in Child JVM");
- TezEvent taskAttemptFailedEvent =
- new TezEvent(new TaskAttemptFailedEvent(
- StringUtils.stringifyException(e)),
- currentSourceInfo);
- currentTaskComplete.set(true);
- heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
- }
- } finally {
- taskLock.readLock().unlock();
- }
- } catch (Throwable throwable) {
- // Heartbeats controlled manually after this.
- if (throwable instanceof Error) {
- LOG.error("Exception of type Error. Exiting now", throwable);
- ExitUtil.terminate(-1, throwable);
- }
- stopped.set(true);
- heartbeatThread.interrupt();
- String cause = StringUtils.stringifyException(throwable);
- LOG.fatal("Error running child : " + cause);
- taskLock.readLock().lock();
- try {
- if (currentTask != null && !currentTask.hadFatalError()) {
- // TODO Is this of any use if the heartbeat thread is being interrupted first ?
- // Prevent dup failure events
- currentTask.setFatalError(throwable, "Error in Child JVM");
- currentTask.setFrameworkCounters();
- TezEvent statusUpdateEvent =
- new TezEvent(new TaskStatusUpdateEvent(
- currentTask.getCounters(), currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- currentTask.getVertexName(), "",
- currentTask.getTaskAttemptID()));
- TezEvent taskAttemptFailedEvent =
- new TezEvent(new TaskAttemptFailedEvent(cause),
- currentSourceInfo);
- currentTaskComplete.set(true);
- heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
- }
- } finally {
- taskLock.readLock().unlock();
- }
- } finally {
- stopped.set(true);
- heartbeatThread.interrupt();
- RPC.stopProxy(umbilical);
- DefaultMetricsSystem.shutdown();
- // Shutting down log4j of the child-vm...
- // This assumes that on return from Task.run()
- // there is no more logging done.
- LogManager.shutdown();
- }
- }
-
- private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum,
- TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
- Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
-
- // FIXME TODONEWTEZ
- conf.setBoolean("ipc.client.tcpnodelay", true);
-
- String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
- conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
- LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
-
- return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
- tezUmbilical, serviceConsumerMetadata, startedInputsMap);
- }
-
- // TODONEWTEZ Is this really required ?
- private static void setFileSystemWorkingDir(Configuration conf) throws IOException {
- FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
- }
-
-
- private static Path getWorkingDirectory(Configuration conf) {
- String name = conf.get(JobContext.WORKING_DIR);
- if (name != null) {
- return new Path(name);
- } else {
- try {
- Path dir = FileSystem.get(conf).getWorkingDirectory();
- conf.set(JobContext.WORKING_DIR, dir.toString());
- return dir;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index f91a909..46e200e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -23,10 +23,10 @@ import java.util.List;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.YarnTezDagChild;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.runtime.task.TezChild;
public class TezRuntimeChildJVM {
@@ -90,7 +90,7 @@ public class TezRuntimeChildJVM {
}
// Add main class and its arguments
- vargs.add(YarnTezDagChild.class.getName()); // main of Child
+ vargs.add(TezChild.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
new file mode 100644
index 0000000..a68c7c1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+
+/**
+ * Responsible for communication between a running Container and the ApplicationMaster. The main
+ * functionality is to poll for new tasks.
+ *
+ */
+public class ContainerReporter implements Callable<ContainerTask> {
+
+ private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
+
+ private final TezTaskUmbilicalProtocol umbilical;
+ private final ContainerContext containerContext;
+ private final int getTaskMaxSleepTime;
+ private final long LOG_INTERVAL = 2000l;
+
+ private long nextGetTaskPrintTime;
+
+ ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
+ int getTaskMaxSleepTime) {
+ this.umbilical = umbilical;
+ this.containerContext = containerContext;
+ this.getTaskMaxSleepTime = getTaskMaxSleepTime;
+ }
+
+ @Override
+ public ContainerTask call() throws Exception {
+ ContainerTask containerTask = null;
+ LOG.info("Attempting to fetch new task");
+ containerTask = umbilical.getTask(containerContext);
+ long getTaskPollStartTime = System.currentTimeMillis();
+ nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
+ for (int idle = 1; containerTask == null; idle++) {
+ long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
+ maybeLogSleepMessage(sleepTimeMilliSecs);
+ TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
+ containerTask = umbilical.getTask(containerContext);
+ }
+ LOG.info("Got TaskUpdate: "
+ + (System.currentTimeMillis() - getTaskPollStartTime)
+ + " ms after starting to poll."
+ + " TaskInfo: shouldDie: "
+ + containerTask.shouldDie()
+ + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
+ + containerTask.getTaskSpec().getTaskAttemptID()));
+ return containerTask;
+ }
+
+ private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
+ long currentTime = System.currentTimeMillis();
+ if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
+ LOG.info("Sleeping for " + sleepTimeMilliSecs
+ + "ms before retrying getTask again. Got null now. "
+ + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
+ nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
new file mode 100644
index 0000000..8b888ff
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -0,0 +1,8 @@
+package org.apache.tez.runtime.task;
+
+public interface ErrorReporter {
+
+ void reportError(Throwable t);
+
+ void shutdownRequested();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
new file mode 100644
index 0000000..d860a0b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.mortbay.log.Log;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Responsible for communication between tasks running in a Container and the ApplicationMaster.
+ * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
+ * retrieve events specific to this task.
+ *
+ */
+public class TaskReporter {
+
+ private static final Logger LOG = Logger.getLogger(TaskReporter.class);
+
+ private final TezTaskUmbilicalProtocol umbilical;
+ private final long pollInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+ private final AtomicLong requestCounter;
+ private final String containerIdStr;
+
+ private final ListeningExecutorService heartbeatExecutor;
+
+ @VisibleForTesting
+ HeartbeatCallable currentCallable;
+
+ public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
+ long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+ this.umbilical = umbilical;
+ this.pollInterval = amPollInterval;
+ this.sendCounterInterval = sendCounterInterval;
+ this.maxEventsToGet = maxEventsToGet;
+ this.requestCounter = requestCounter;
+ this.containerIdStr = containerIdStr;
+ ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
+ heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+ }
+
+ /**
+ * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
+ */
+ public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
+ ErrorReporter errorReporter) {
+ currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
+ maxEventsToGet, requestCounter, containerIdStr);
+ ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
+ Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+ }
+
+ /**
+ * This method should always be invoked before setting up heartbeats for another task running in
+ * the same container.
+ */
+ public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+ currentCallable.markComplete();
+ currentCallable = null;
+ }
+
+ public void shutdown() {
+ heartbeatExecutor.shutdownNow();
+ }
+
+ private static class HeartbeatCallable implements Callable<Boolean> {
+
+ private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
+ private static final float LOG_COUNTER_BACKOFF = 1.3f;
+
+ private final LogicalIOProcessorRuntimeTask task;
+ private EventMetaData updateEventMetadata;
+
+ private final TezTaskUmbilicalProtocol umbilical;
+
+ private final long pollInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+ private final String containerIdStr;
+
+ private final AtomicLong requestCounter;
+
+ private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ /*
+ * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
+ * log counters.
+ */
+ private int nonOobHeartbeatCounter = 0;
+ private int nextHeartbeatNumToLog = 0;
+ /*
+ * Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
+ */
+ private int prevCounterSendHeartbeatNum = 0;
+
+ public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
+ TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
+ int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+
+ this.pollInterval = amPollInterval;
+ this.sendCounterInterval = sendCounterInterval;
+ this.maxEventsToGet = maxEventsToGet;
+ this.requestCounter = requestCounter;
+ this.containerIdStr = containerIdStr;
+
+ this.task = task;
+ this.umbilical = umbilical;
+ this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
+ task.getVertexName(), "", task.getTaskAttemptID());
+
+ nextHeartbeatNumToLog = (Math.max(1,
+ (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+ : (float) amPollInterval))));
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ // Heartbeat only for active tasks. Errors, etc will be reported directly.
+ while (!task.isTaskDone() && !task.hadFatalError()) {
+ boolean result = heartbeat(null);
+ if (!result) {
+ // AM sent a shouldDie=true
+ LOG.info("Asked to die via task heartbeat");
+ return false;
+ }
+ lock.lock();
+ try {
+ boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+ if (!interrupted) {
+ nonOobHeartbeatCounter++;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ int pendingEventCount = eventsToSend.size();
+ if (pendingEventCount > 0) {
+ LOG.warn("Exiting TaskReporter therad with pending queue size=" + pendingEventCount);
+ }
+ return true;
+ }
+
+ /**
+ * @param eventsArg
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+ TezException {
+
+ if (eventsArg != null) {
+ eventsToSend.addAll(eventsArg);
+ }
+
+ TezEvent updateEvent = null;
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ eventsToSend.drainTo(events);
+
+ if (!task.isTaskDone() && !task.hadFatalError()) {
+ TezCounters counters = null;
+ /**
+ * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
+ * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
+ * real time decisions are made based on these counters, it can be sent once per second.
+ */
+ // Not completely accurate, since OOB heartbeats could go out.
+ if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+ counters = task.getCounters();
+ prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+ }
+ updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
+ updateEventMetadata);
+ events.add(updateEvent);
+ }
+
+ long requestId = requestCounter.incrementAndGet();
+ TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
+ task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+ if (LOG.isDebugEnabled()) {
+ Log.debug("Sending heartbeat to AM, request=" + request);
+ }
+
+ maybeLogCounters();
+
+ TezHeartbeatResponse response = umbilical.heartbeat(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat response from AM, response=" + response);
+ }
+
+ if (response.shouldDie()) {
+ LOG.info("Received should die response from AM");
+ return false;
+ }
+ if (response.getLastRequestId() != requestId) {
+ throw new TezException("AM and Task out of sync" + ", responseReqId="
+ + response.getLastRequestId() + ", expectedReqId=" + requestId);
+ }
+
+ // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
+ // are running using the same umbilical.
+ if (task.isTaskDone() || task.hadFatalError()) {
+ if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+ LOG.warn("Current task already complete, Ignoring all event in"
+ + " heartbeat response, eventCount=" + response.getEvents().size());
+ }
+ } else {
+ if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
+ }
+ // This should ideally happen in a separate thread
+ task.handleEvents(response.getEvents());
+ }
+ }
+ return true;
+
+ }
+
+ public void markComplete() {
+ // Notify to clear pending events, if any.
+ lock.lock();
+ try {
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void maybeLogCounters() {
+ if (LOG.isDebugEnabled()) {
+ if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+ LOG.debug("Counters: " + task.getCounters().toShortString());
+ nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+ }
+ }
+ }
+
+ /**
+ * Sends out final events for task success.
+ * @param taskAttemptID
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+ TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+ task.getProgress()), updateEventMetadata);
+ TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+ updateEventMetadata);
+ return heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent));
+ }
+
+ /**
+ * Sends out final events for task failure.
+ * @param taskAttemptID
+ * @param t
+ * @param diagnostics
+ * @param srcMeta
+ * @return
+ * @throws IOException
+ * indicates an RPC communication failure.
+ * @throws TezException
+ * indicates an exception somewhere in the AM.
+ */
+ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+ EventMetaData srcMeta) throws IOException, TezException {
+ TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+ task.getProgress()), updateEventMetadata);
+ if (diagnostics == null) {
+ diagnostics = StringUtils.stringifyException(t);
+ }
+ TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ srcMeta == null ? updateEventMetadata : srcMeta);
+ return heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
+ }
+
+ private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+ if (events != null && !events.isEmpty()) {
+ eventsToSend.addAll(events);
+ }
+ }
+ }
+
+ private static class HeartbeatCallback implements FutureCallback<Boolean> {
+
+ private final ErrorReporter errorReporter;
+
+ HeartbeatCallback(ErrorReporter errorReporter) {
+ this.errorReporter = errorReporter;
+ }
+
+ @Override
+ public void onSuccess(Boolean result) {
+ if (result == false) {
+ errorReporter.shutdownRequested();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ errorReporter.reportError(t);
+ }
+ }
+
+ public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+ return currentCallable.taskSucceeded(taskAttemptID);
+ }
+
+ public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+ EventMetaData srcMeta) throws IOException, TezException {
+ return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+ }
+
+ public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+ currentCallable.addEvents(taskAttemptID, events);
+ }
+
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+ return umbilical.canCommit(taskAttemptID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
new file mode 100644
index 0000000..9e0e523
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TezChild {
+
+ private static final Logger LOG = Logger.getLogger(TezChild.class);
+
+ private final Configuration defaultConf;
+ private final String containerIdString;
+ private final int appAttemptNumber;
+ private final InetSocketAddress address;
+ private final String[] localDirs;
+
+ private final AtomicLong heartbeatCounter = new AtomicLong(0);
+
+ private final int getTaskMaxSleepTime;
+ private final int amHeartbeatInterval;
+ private final long sendCounterInterval;
+ private final int maxEventsToGet;
+
+ private final ListeningExecutorService executor;
+ private final ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+ private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+
+ private Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+ private TaskReporter taskReporter;
+ private TezTaskUmbilicalProtocol umbilical;
+ private int taskCount = 0;
+ private TezVertexID lastVertexID;
+
+ public TezChild(Configuration conf, String host, int port, String containerIdentifier,
+ String tokenIdentifier, int appAttemptNumber, String[] localDirs) throws IOException,
+ InterruptedException {
+ this.defaultConf = conf;
+ this.containerIdString = containerIdentifier;
+ this.appAttemptNumber = appAttemptNumber;
+ this.localDirs = localDirs;
+
+ getTaskMaxSleepTime = defaultConf.getInt(
+ TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
+ TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
+
+ amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+ sendCounterInterval = defaultConf.getLong(
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
+
+ maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
+ address = NetUtils.createSocketAddrForHost(host, port);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("TezChild").build());
+ this.executor = MoreExecutors.listeningDecorator(executor);
+
+ // Security framework already loaded the tokens into current ugi
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing with tokens:");
+ for (Token<?> token : credentials.getAllTokens()) {
+ LOG.debug(token);
+ }
+ }
+
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+ SecurityUtil.setTokenService(jobToken, address);
+ taskOwner.addToken(jobToken);
+
+ serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+ ShuffleUtils.convertJobTokenToBytes(jobToken));
+
+ umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+ @Override
+ public TezTaskUmbilicalProtocol run() throws Exception {
+ return (TezTaskUmbilicalProtocol) RPC.getProxy(TezTaskUmbilicalProtocol.class,
+ TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+ }
+ });
+ }
+
+ void run() throws IOException, InterruptedException, TezException {
+
+ ContainerContext containerContext = new ContainerContext(containerIdString);
+ ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
+ getTaskMaxSleepTime);
+
+ taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
+ sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
+
+ UserGroupInformation childUGI = null;
+
+ while (true) {
+ if (taskCount > 0) {
+ TezUtils.updateLoggers("");
+ }
+ ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+ ContainerTask containerTask = null;
+ try {
+ containerTask = getTaskFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ handleError(cause);
+ return;
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for task to complete:"
+ + containerTask.getTaskSpec().getTaskAttemptID());
+ handleError(e);
+ return;
+ }
+ if (containerTask.shouldDie()) {
+ LOG.info("ContainerTask returned shouldDie=true, Exiting");
+ shutdown();
+ } else {
+ String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
+ taskCount++;
+ TezUtils.updateLoggers(loggerAddend);
+ FileSystem.clearStatistics();
+
+ childUGI = handleNewTaskCredentials(containerTask, childUGI);
+ handleNewTaskLocalResources(containerTask);
+ cleanupOnTaskChanged(containerTask);
+
+ // Execute the Actual Task
+ TezTaskRunner taskRunner = new TezTaskRunner(new TezConfiguration(defaultConf), childUGI,
+ localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
+ serviceConsumerMetadata, startedInputsMap, taskReporter, executor);
+ boolean shouldDie = false;
+ try {
+ shouldDie = !taskRunner.run();
+ if (shouldDie) {
+ LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+ shutdown();
+ }
+ } catch (IOException e) {
+ handleError(e);
+ return;
+ } catch (TezException e) {
+ handleError(e);
+ return;
+ } finally {
+ FileSystem.closeAllForUGI(childUGI);
+ }
+ }
+ }
+ }
+
+ /**
+ * Setup
+ *
+ * @param containerTask
+ * the new task specification. Must be a valid task
+ * @param childUGI
+ * the old UGI instance being used
+ * @return
+ */
+ UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
+ UserGroupInformation childUGI) {
+ // Re-use the UGI only if the Credentials have not changed.
+ Preconditions.checkState(containerTask.shouldDie() != true);
+ Preconditions.checkState(containerTask.getTaskSpec() != null);
+ if (containerTask.haveCredentialsChanged()) {
+ LOG.info("Refreshing UGI since Credentials have changed");
+ Credentials taskCreds = containerTask.getCredentials();
+ if (taskCreds != null) {
+ LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+ + taskCreds.numberOfSecretKeys());
+ childUGI = UserGroupInformation.createRemoteUser(System
+ .getenv(ApplicationConstants.Environment.USER.toString()));
+ childUGI.addCredentials(containerTask.getCredentials());
+ } else {
+ LOG.info("Not loading any credentials, since no credentials provided");
+ }
+ }
+ return childUGI;
+ }
+
+ /**
+ * Handles any additional resources to be localized for the new task
+ *
+ * @param containerTask
+ * @throws IOException
+ * @throws TezException
+ */
+ private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
+ TezException {
+ Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Additional Resources added to container: " + additionalResources);
+ }
+
+ LOG.info("Localizing additional local resources for Task : " + additionalResources);
+ List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+ Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+ @Override
+ public URI apply(TezLocalResource input) {
+ return input.getUri();
+ }
+ }), defaultConf);
+ RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+ LOG.info("Done localizing additional resources");
+ final TaskSpec taskSpec = containerTask.getTaskSpec();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New container task context:" + taskSpec.toString());
+ }
+ }
+
+ /**
+ * Cleans entries from the object registry, and resets the startedInputsMap if required
+ *
+ * @param containerTask
+ * the new task specification. Must be a valid task
+ */
+ private void cleanupOnTaskChanged(ContainerTask containerTask) {
+ Preconditions.checkState(containerTask.shouldDie() != true);
+ Preconditions.checkState(containerTask.getTaskSpec() != null);
+ TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
+ .getVertexID();
+ if (lastVertexID != null) {
+ if (!lastVertexID.equals(newVertexID)) {
+ objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
+ }
+ if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
+ objectRegistry.clearCache(ObjectLifeCycle.DAG);
+ startedInputsMap = HashMultimap.create();
+ }
+ }
+ lastVertexID = newVertexID;
+ }
+
+ private void shutdown() {
+ executor.shutdownNow();
+ if (taskReporter != null) {
+ taskReporter.shutdown();
+ }
+ RPC.stopProxy(umbilical);
+ DefaultMetricsSystem.shutdown();
+ LogManager.shutdown();
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException, TezException {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ LOG.info("TezChild starting");
+
+ final Configuration defaultConf = new Configuration();
+ // Pull in configuration specified for the session.
+ TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
+ UserGroupInformation.setConfiguration(defaultConf);
+ Limits.setConfiguration(defaultConf);
+
+ assert args.length == 5;
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ final String containerIdentifier = args[2];
+ final String tokenIdentifier = args[3];
+ final int attemptNumber = Integer.parseInt(args[4]);
+ final String pid = System.getenv().get("JVM_PID");
+ final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
+ .name()));
+ LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+ + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+ + " tokenIdentifier: " + tokenIdentifier);
+ }
+
+ // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
+ // of this class. Leaving it here, till there's some entity representing a running JVM.
+ DefaultMetricsSystem.initialize("TezTask");
+
+ ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+ @SuppressWarnings("unused")
+ Injector injector = Guice.createInjector(new ObjectRegistryModule(objectRegistry));
+
+ TezChild tezChild = new TezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier,
+ attemptNumber, localDirs);
+
+ tezChild.run();
+ }
+
+ private void handleError(Throwable t) {
+ shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
new file mode 100644
index 0000000..0e622f9
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class TezTaskRunner implements TezUmbilical, ErrorReporter {
+
+ private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
+
+ private final Configuration tezConf;
+ private final LogicalIOProcessorRuntimeTask task;
+ private final UserGroupInformation ugi;
+
+ private final TaskReporter taskReporter;
+ private final ListeningExecutorService executor;
+ private volatile ListenableFuture<Void> taskFuture;
+ private volatile Thread waitingThread;
+ private volatile Throwable firstException;
+
+ // Effectively a duplicate check, since hadFatalError does the same thing.
+ private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
+ private final AtomicBoolean taskRunning;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+ TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+ TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
+ Map<String, ByteBuffer> serviceConsumerMetadata, Multimap<String, String> startedInputsMap,
+ TaskReporter taskReporter, ListeningExecutorService executor) throws IOException {
+ this.tezConf = tezConf;
+ this.ugi = ugi;
+ this.tezConf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+ this.taskReporter = taskReporter;
+ this.executor = executor;
+ task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, this,
+ serviceConsumerMetadata, startedInputsMap);
+ taskReporter.registerTask(task, this);
+ taskRunning = new AtomicBoolean(true);
+
+ }
+
+ /**
+ * @return false if a shutdown message was received during task execution
+ * @throws TezException
+ * @throws IOException
+ */
+ public boolean run() throws InterruptedException, IOException, TezException {
+ waitingThread = Thread.currentThread();
+ TaskRunnerCallable callable = new TaskRunnerCallable();
+ Throwable failureCause = null;
+ taskFuture = executor.submit(callable);
+ try {
+ taskFuture.get();
+
+ // Task could signal a fatal error and return control, or a failure while registering success.
+ failureCause = firstException;
+
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for task to complete. Interrupting task");
+ taskFuture.cancel(true);
+ if (shutdownRequested.get()) {
+ LOG.info("Shutdown requested... returning");
+ return false;
+ }
+ if (firstException != null) {
+ failureCause = firstException;
+ } else {
+ // Interrupted for some other reason.
+ failureCause = e;
+ }
+ } catch (ExecutionException e) {
+ // Exception thrown by the run() method itself.
+ Throwable cause = e.getCause();
+ if (cause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ failureCause = cause;
+ } else if (cause instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now", cause);
+ ExitUtil.terminate(-1, cause);
+ // Effectively dead code. Must return something.
+ assert(false);
+ return false;
+ } else {
+ failureCause = cause;
+ }
+ } finally {
+ // Clear the interrupted status of the blocking thread, in case it is set after the
+ // InterruptedException was invoked.
+ taskReporter.unregisterTask(task.getTaskAttemptID());
+ Thread.interrupted();
+ }
+
+ if (failureCause != null) {
+ if (failureCause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+ failureCause);
+ throw (FSError) failureCause;
+ } else if (failureCause instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now", failureCause);
+ ExitUtil.terminate(-1, failureCause);
+ // Effectively dead code. Must return something.
+ assert(false);
+ return false;
+ } else {
+ if (failureCause instanceof IOException) {
+ throw (IOException) failureCause;
+ } else if (failureCause instanceof TezException) {
+ throw (TezException) failureCause;
+ } else if (failureCause instanceof InterruptedException) {
+ throw (InterruptedException) failureCause;
+ } else {
+ throw new TezException(failureCause);
+ }
+ }
+ }
+ if (shutdownRequested.get()) {
+ LOG.info("Shutdown requested... returning");
+ return false;
+ }
+ return true;
+ }
+
+ private class TaskRunnerCallable implements Callable<Void> {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+ task.initialize();
+ if (!Thread.currentThread().isInterrupted() && firstException == null) {
+ LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+ task.run();
+ LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+ task.close();
+ task.setFrameworkCounters();
+ }
+ LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
+ + ", fatalErrorOccurred=" + (firstException != null));
+ if (firstException == null) {
+ try {
+ taskReporter.taskSucceeded(task.getTaskAttemptID());
+ } catch (IOException e) {
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ maybeRegisterFirstException(e);
+ // Falling off, since the runner thread checks for the registered exception.
+ } catch (TezException e) {
+ LOG.warn("Heartbeat failure reported by AM", e);
+ maybeRegisterFirstException(e);
+ // Falling off, since the runner thread checks for the registered exception.
+ }
+ }
+ return null;
+ } catch (Throwable cause) {
+ if (cause instanceof FSError) {
+ // Not immediately fatal, this is an error reported by Hadoop FileSystem
+ maybeRegisterFirstException(cause);
+ LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+ cause);
+ try {
+ sendFailure(cause, "FS Error in Child JVM");
+ } catch (Exception ignored) {
+ // Ignored since another cause is already known
+ LOG.info(
+ "Ignoring the following exception since a previous exception is already registered",
+ ignored);
+ }
+ throw (FSError) cause;
+ } else if (cause instanceof Error) {
+ LOG.error("Exception of type Error. Exiting now", cause);
+ ExitUtil.terminate(-1, cause);
+ } else {
+ if (cause instanceof UndeclaredThrowableException) {
+ cause = ((UndeclaredThrowableException) cause).getCause();
+ }
+ maybeRegisterFirstException(cause);
+ LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+ cause);
+ try {
+ sendFailure(cause, "Failure while running task");
+ } catch (Exception ignored) {
+ // Ignored since another cause is already known
+ LOG.info(
+ "Ignoring the following exception since a previous exception is already registered",
+ ignored);
+ }
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof TezException) {
+ throw (TezException) cause;
+ } else {
+ throw new TezException(cause);
+ }
+ }
+ } finally {
+ task.cleanup();
+ }
+ return null;
+ }
+ });
+ } finally {
+ taskRunning.set(false);
+ }
+ }
+ }
+
+ private void sendFailure(Throwable t, String message) throws IOException, TezException {
+ if (!fatalErrorSent.getAndSet(true)) {
+ task.setFatalError(t, message);
+ task.setFrameworkCounters();
+ try {
+ taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
+ } catch (IOException e) {
+ // A failure reason already exists, Comm error just logged.
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ throw e;
+ } catch (TezException e) {
+ // A failure reason already exists, Comm error just logged.
+ LOG.warn("Heartbeat failure reported by AM", e);
+ throw e;
+ }
+ } else {
+ LOG.warn("Ignoring fatal error since another error has already been reported", t);
+ }
+ }
+
+ @Override
+ public void addEvents(Collection<TezEvent> events) {
+ if (taskRunning.get()) {
+ taskReporter.addEvents(task.getTaskAttemptID(), events);
+ }
+ }
+
+ @Override
+ public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
+ String message, EventMetaData sourceInfo) {
+ // This can be called before a task throws an exception or after it.
+ // If called before a task throws an exception
+ // - ensure a heartbeat is sent with the diagnostics, and sent only once.
+ // - interrupt the waiting thread, and make it throw the reported error.
+ // If called after a task throws an exception, the waiting task has already returned, no point
+ // interrupting it.
+ // This case can be effectively ignored (log), as long as the run() method ends up throwing the
+ // exception.
+ //
+ //
+ if (!fatalErrorSent.getAndSet(true)) {
+ maybeRegisterFirstException(t);
+ try {
+ taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+ } catch (IOException e) {
+ // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+ // occurred earlier.
+ LOG.warn("Heartbeat failure caused by communication failure", e);
+ } catch (TezException e) {
+ // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+ // occurred earlier.
+ LOG.warn("Heartbeat failure reported by AM", e);
+ } finally {
+ // Wake up the waiting thread so that it can return control
+ waitingThread.interrupt();
+ }
+ }
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+ if (taskRunning.get()) {
+ try {
+ return taskReporter.canCommit(taskAttemptID);
+ } catch (IOException e) {
+ LOG.warn("Communication failure while trying to commit", e);
+ maybeRegisterFirstException(e);
+ waitingThread.interrupt();
+ // Not informing the task since it will be interrupted.
+ // TODO: Should this be sent to the task as well, current Processors, etc do not handle
+ // interrupts very well.
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized void reportError(Throwable t) {
+ if (t instanceof Error) {
+ LOG.error("Exception of type Error during heartbeat, Exiting Now");
+ ExitUtil.terminate(-1, t);
+ } else if (taskRunning.get()) {
+ LOG.error("TaskReporter reported error", t);
+ maybeRegisterFirstException(t);
+ waitingThread.interrupt();
+ // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+ // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+ // method does not throw an exception, in which case task success is registered with the AM.
+ // Leave this handling to the next getTask / actual task.
+ } else {
+ LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+ + " is already complete");
+ }
+ }
+
+ @Override
+ public void shutdownRequested() {
+ shutdownRequested.set(true);
+ waitingThread.interrupt();
+ }
+
+ private String getTaskDiagnosticsString(Throwable t, String message) {
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t) + ", errorMessage="
+ + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ? "exceptionThrown=" + StringUtils.stringifyException(t)
+ : " errorMessage=" + message;
+ }
+ return diagnostics;
+ }
+
+ private synchronized void maybeRegisterFirstException(Throwable t) {
+ if (firstException == null) {
+ firstException = t;
+ }
+ }
+
+}