You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/15 22:21:26 UTC
[2/2] tez git commit: TEZ-2774. Improvements and cleanup of logging
for the AM and parts of the runtime. Contributed by Siddharth Seth and Bikas
Saha.
TEZ-2774. Improvements and cleanup of logging for the AM and parts of
the runtime. Contributed by Siddharth Seth and Bikas Saha.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f785ce8d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f785ce8d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f785ce8d
Branch: refs/heads/master
Commit: f785ce8d8653a469c8c6e6a9bbcfcff40c6e1289
Parents: d93bdc7
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 15 13:20:37 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 15 13:20:37 2015 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/AsyncDispatcher.java | 14 +-
.../tez/common/AsyncDispatcherConcurrent.java | 9 +-
.../org/apache/tez/common/TezUtilsInternal.java | 14 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 27 +++-
.../tez/dag/app/TaskCommunicatorManager.java | 7 +-
.../app/dag/RootInputInitializerManager.java | 2 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 34 +++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 128 +++++++++---------
.../app/launcher/TezContainerLauncherImpl.java | 8 +-
.../tez/dag/app/rm/TaskSchedulerManager.java | 19 +--
.../dag/app/rm/YarnTaskSchedulerService.java | 132 +++++++++++--------
.../app/rm/container/AMContainerHelpers.java | 10 +-
.../dag/app/rm/container/AMContainerImpl.java | 18 +--
.../dag/app/rm/node/PerSourceNodeTracker.java | 2 +
.../tez/dag/history/HistoryEventHandler.java | 8 +-
.../events/TaskAttemptFinishedEvent.java | 12 +-
.../history/events/TaskAttemptStartedEvent.java | 4 +-
.../impl/SimpleHistoryLoggingService.java | 4 +-
.../dag/history/recovery/RecoveryService.java | 8 +-
.../resources/tez-container-log4j.properties | 2 +-
.../mapreduce/committer/MROutputCommitter.java | 3 +-
.../common/MRInputAMSplitGenerator.java | 31 ++---
.../common/MRInputSplitDistributor.java | 7 +-
.../tez/mapreduce/hadoop/MRInputHelpers.java | 14 +-
.../tez/mapreduce/partition/MRPartitioner.java | 18 ++-
.../logging/ats/ATSHistoryLoggingService.java | 9 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 50 ++++---
.../runtime/api/impl/TezInputContextImpl.java | 4 +-
.../runtime/api/impl/TezOutputContextImpl.java | 4 +-
.../api/impl/TezProcessorContextImpl.java | 4 +-
.../common/resources/MemoryDistributor.java | 76 +++++++++--
.../tez/runtime/metrics/TaskCounterUpdater.java | 4 +-
.../tez/runtime/task/ContainerReporter.java | 2 +-
.../org/apache/tez/runtime/task/TezChild.java | 38 +++---
.../runtime/library/common/TezRuntimeUtils.java | 1 -
.../WeightedScalingMemoryDistributor.java | 8 +-
.../tez/mapreduce/examples/RPCLoadGen.java | 2 -
39 files changed, 449 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 4319f4f..159ccd9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
@Override
protected void serviceStart() throws Exception {
eventHandlingThread = new Thread(createThread());
- eventHandlingThread.setName("Dispatcher thread: " + name);
+ eventHandlingThread.setName("Dispatcher thread {" + name + "}");
eventHandlingThread.start();
//start all the components
@@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
- Preconditions.checkState(concurrentDispatcher == null,
+ Preconditions.checkState(concurrentDispatcher == null,
"Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
}
@@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
@@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
@@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
- + handler.getClass());
+ LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+ + handler.getClass());
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index d19bf9e..321ea8b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
@Override
protected void serviceStart() throws Exception {
execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+ .setNameFormat("Dispatcher {" + this.name + "} #%d").build());
for (int i=0; i<numThreads; ++i) {
eventQueues.add(new LinkedBlockingQueue<Event>());
}
@@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
- Preconditions.checkState(registeredDispatcher == null,
+ Preconditions.checkState(registeredDispatcher == null,
"Multiple dispatchers cannot be registered for: " + eventType.getName());
}
@@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
@@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
- + handler.getClass());
+ + handler.getClass());
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index d6ef901..c2a50f5 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -109,13 +109,10 @@ public class TezUtilsInternal {
public static byte[] compressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
byte[] compressed = compressBytesInflateDeflate(inBytes);
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+ ", CompressTime: " + sw.elapsedMillis());
}
@@ -123,13 +120,10 @@ public class TezUtilsInternal {
}
public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+ ", UncompressTimeTaken: " + sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fee13c1..c713435 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -467,7 +467,6 @@ public class DAGAppMaster extends AbstractService {
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
- LOG.info("Adding session token to jobTokenSecretManager for application");
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);
@@ -495,8 +494,11 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
- if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
- TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+ boolean useConcurrentDispatcher =
+ conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+ TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
+ LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
+ if (!useConcurrentDispatcher) {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
} else {
@@ -560,7 +562,7 @@ public class DAGAppMaster extends AbstractService {
currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
appAttemptID.getAttemptId());
if (LOG.isDebugEnabled()) {
- LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+ LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
+ " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+ " recoveryAttemptDir :" + currentRecoveryDataDir);
}
@@ -926,7 +928,7 @@ public class DAGAppMaster extends AbstractService {
try {
if (LOG.isDebugEnabled()) {
- LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB).toString());
}
@@ -2102,6 +2104,7 @@ public class DAGAppMaster extends AbstractService {
public static void main(String[] args) {
try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ final String pid = System.getenv().get("JVM_PID");
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
@@ -2141,6 +2144,18 @@ public class DAGAppMaster extends AbstractService {
false, "Run Tez Application Master in Session mode");
CommandLine cliParser = new GnuParser().parse(opts, args);
+ boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+ LOG.info("Creating DAGAppMaster for "
+ + "applicationId=" + applicationAttemptId.getApplicationId()
+ + ", attemptNum=" + applicationAttemptId.getAttemptId()
+ + ", AMContainerId=" + containerId
+ + ", jvmPid=" + pid
+ + ", userFromEnv=" + jobUserName
+ + ", cliSessionOption=" + sessionModeCliOption
+ + ", pwd=" + System.getenv(Environment.PWD.name())
+ + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name())
+ + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name()));
// TODO Does this really need to be a YarnConfiguration ?
Configuration conf = new Configuration(new YarnConfiguration());
@@ -2161,7 +2176,7 @@ public class DAGAppMaster extends AbstractService {
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime,
- cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION),
+ sessionModeCliOption,
System.getenv(Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 2cc6ae2..0bc02dc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -155,13 +155,13 @@ public class TaskCommunicatorManager extends AbstractService implements
@VisibleForTesting
TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- LOG.info("Using Default Task Communicator");
+ LOG.info("Creating Default Task Communicator");
return new TezTaskCommunicatorImpl(taskCommunicatorContext);
}
@VisibleForTesting
TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- LOG.info("Using Default Local Task Communicator");
+ LOG.info("Creating Default Local Task Communicator");
return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
}
@@ -169,7 +169,7 @@ public class TaskCommunicatorManager extends AbstractService implements
TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
NamedEntityDescriptor taskCommDescriptor)
throws TezException {
- LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
+ LOG.info("Creating TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
taskCommDescriptor.getClassName());
Class<? extends TaskCommunicator> taskCommClazz =
(Class<? extends TaskCommunicator>) ReflectionUtils
@@ -322,7 +322,6 @@ public class TaskCommunicatorManager extends AbstractService implements
*/
// @Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
- LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
// An attempt is asking if it can commit its output. This can be decided
// only by the task which is managing the multiple attempts. So redirect the
// request there.
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4a8a286..13128f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -100,7 +100,7 @@ public class RootInputInitializerManager {
this.vertex = vertex;
this.eventHandler = appContext.getEventHandler();
this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+ .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build());
this.executor = MoreExecutors.listeningDecorator(rawExecutor);
this.dagUgi = dagUgi;
this.entityStateTracker = stateTracker;
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index c5a3c35..d2801e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -182,6 +182,8 @@ public interface Vertex extends Comparable<Vertex> {
public Configuration getConf();
+ public boolean isSpeculationEnabled();
+
public int getTaskSchedulerIdentifier();
public int getContainerLauncherIdentifier();
public int getTaskCommunicatorIdentifier();
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index abcd98d..2f228bd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -782,10 +782,12 @@ public class TaskAttemptImpl implements TaskAttempt,
);
}
if (oldState != getInternalState()) {
- LOG.info(attemptId + " TaskAttempt Transitioned from "
- + oldState + " to "
- + getInternalState() + " due to event "
- + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(attemptId + " TaskAttempt Transitioned from "
+ + oldState + " to "
+ + getInternalState() + " due to event "
+ + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -1116,7 +1118,9 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskSpec remoteTaskSpec;
try {
remoteTaskSpec = ta.createRemoteTaskSpec();
- LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
+ }
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
LOG.error(msg, e);
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4d449d4..2f304c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -497,7 +497,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
int toEventId = actualMax + fromEventId;
events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
- + "-" + toEventId + ")");
+ + "-" + toEventId + ").");
// currently not modifying the events so that we dont have to create
// copies of events. e.g. if we have to set taskAttemptId into the TezEvent
// destination metadata then we will need to create a copy of the TezEvent
@@ -756,12 +756,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public boolean canCommit(TezTaskAttemptID taskAttemptID) {
writeLock.lock();
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit go/no-go request from " + taskAttemptID);
+ }
TaskState state = getState();
if (state == TaskState.SCHEDULED) {
// the actual running task ran and is done and asking for commit. we are still stuck
// in the scheduled state which indicates a backlog in event processing. lets wait for the
// backlog to clear. returning false will make the attempt come back to us.
- LOG.debug("Event processing delay. "
+ LOG.info(
+ "Event processing delay. "
+ "Attempt committing before state machine transitioned to running : Task {}", taskId);
return false;
}
@@ -792,7 +796,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
} else {
if (commitAttempt.equals(taskAttemptID)) {
- LOG.info(taskAttemptID + " given a go for committing the task output.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(taskAttemptID + " already given a go for committing the task output.");
+ }
return true;
}
// Don't think this can be a pluggable decision, so simply raise an
@@ -800,9 +806,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Wait for commit attempt to succeed. Dont kill this. If commit
// attempt fails then choose a different committer. When commit attempt
// succeeds then this and others will be killed
- LOG.info(commitAttempt
- + " is current committer. Commit waiting for: "
- + taskAttemptID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(commitAttempt + " is current committer. Commit waiting for: " + taskAttemptID);
+ }
return false;
}
@@ -810,7 +816,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
writeLock.unlock();
}
}
-
+
TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
@@ -895,9 +901,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
internalError(event.getType());
}
if (oldState != getInternalState()) {
- LOG.info(taskId + " Task Transitioned from " + oldState + " to "
- + getInternalState() + " due to event "
- + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+ + getInternalState() + " due to event "
+ + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -1108,7 +1116,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// other reasons.
!attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
- task.successfulAttempt + " has succeeded");
+ task.successfulAttempt + " has succeeded");
String diagnostics = null;
TaskAttemptTerminationCause errCause = null;
if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
@@ -1466,7 +1474,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
} else {
// nothing to do
LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
- task.successfulAttempt + " is already successful");
+ task.successfulAttempt + " is already successful");
return TaskStateInternal.SUCCEEDED;
}
}
@@ -1509,7 +1517,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
- LOG.info("Removing commit attempt: " + commitAttempt);
+ LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed");
commitAttempt = null;
}
if (attempt != null && !attempt.isFinished()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 946ec19..c9b4205 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -913,6 +913,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
+ this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
@@ -971,6 +972,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.containerContext = new ContainerContext(this.localResources,
appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
+ LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
if (vertexPlan.getInputsCount() > 0) {
setAdditionalInputs(vertexPlan.getInputsList());
@@ -993,7 +995,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
}
- logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -1033,16 +1035,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName +
- ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName);
-
- taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
- taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
- containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName);
-
- Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName);
- Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName);
- Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName);
+ try {
+ taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
+ } catch (Exception e) {
+ LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName);
+ throw e;
+ }
+ try {
+ taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
+ } catch (Exception e) {
+ LOG.error("Failed to get index for taskCommunicator: " + taskCommName);
+ throw e;
+ }
+ try {
+ containerLauncherIdentifier =
+ appContext.getContainerLauncherIdentifier(containerLauncherName);
+ } catch (Exception e) {
+ LOG.error("Failed to get index for containerLauncher: " + containerLauncherName);
+ throw e;
+ }
StringBuilder sb = new StringBuilder();
sb.append("Running vertex: ").append(logIdentifier).append(" : ")
@@ -1076,7 +1087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return this.taskCommunicatorIdentifier;
}
- private boolean isSpeculationEnabled() {
+ @Override
+ public boolean isSpeculationEnabled() {
return isSpeculationEnabled;
}
@@ -2110,29 +2122,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
+ private static String constructCheckTasksForCompletionLog(VertexImpl vertex) {
+ String logLine = vertex.logIdentifier
+ + ", tasks=" + vertex.numTasks
+ + ", failed=" + vertex.failedTaskCount
+ + ", killed=" + vertex.killedTaskCount
+ + ", success=" + vertex.succeededTaskCount
+ + ", completed=" + vertex.completedTaskCount
+ + ", commits=" + vertex.commitFutures.size()
+ + ", err=" + vertex.terminationCause;
+ return logLine;
+ }
+
// triggered by task_complete
static VertexState checkTasksForCompletion(final VertexImpl vertex) {
-
- LOG.info("Checking tasks for vertex completion for "
- + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", commitInProgress=" + vertex.commitFutures.size()
- + ", terminationCause=" + vertex.terminationCause);
-
+ // this log helps quickly count the completion count for a vertex.
+ // grepping and counting for attempts and handling re-tries is time consuming
+ LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex));
//check for vertex failure first
if (vertex.completedTaskCount > vertex.tasks.size()) {
LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
- + " for vertex " + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", terminationCause=" + vertex.terminationCause);
+ + constructCheckTasksForCompletionLog(vertex));
}
if (vertex.completedTaskCount == vertex.tasks.size()) {
@@ -2141,7 +2151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//Only succeed if tasks complete successfully and no terminationCause is registered.
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
- LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+ LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// start commit if there're commits or just finish if no commits
return commitOrFinish(vertex);
@@ -2159,16 +2169,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//triggered by commit_complete
static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
- LOG.info("Checking commits for vertex completion for "
- + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", commitInProgress=" + vertex.commitFutures.size()
- + ", terminationCause=" + vertex.terminationCause);
-
+ LOG.info("Commits completion: "
+ + constructCheckTasksForCompletionLog(vertex));
// terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
if (vertex.terminationCause == null) {
Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
@@ -2289,20 +2291,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private void initializeCommitters() throws Exception {
if (!this.additionalOutputSpecs.isEmpty()) {
- LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+ LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" +
+ additionalOutputs.size());
for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
additionalOutputs.entrySet()) {
final String outputName = entry.getKey();
final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
if (od.getControllerDescriptor() == null
|| od.getControllerDescriptor().getClassName() == null) {
- LOG.info("Ignoring committer as none specified for output="
- + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring committer as none specified for output="
+ + outputName
+ + ", vertexId=" + logIdentifier);
+ }
continue;
}
LOG.info("Instantiating committer for output=" + outputName
- + ", vertexId=" + logIdentifier
+ + ", vertex=" + logIdentifier
+ ", committerClass=" + od.getControllerDescriptor().getClassName());
dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -2319,12 +2324,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
.createClazzInstance(od.getControllerDescriptor().getClassName(),
new Class[]{OutputCommitterContext.class},
new Object[]{outputCommitterContext});
- LOG.info("Invoking committer init for output=" + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoking committer init for output=" + outputName
+ + ", vertex=" + logIdentifier);
+ }
outputCommitter.initialize();
outputCommitters.put(outputName, outputCommitter);
- LOG.info("Invoking committer setup for output=" + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoking committer setup for output=" + outputName
+ + ", vertex=" + logIdentifier);
+ }
outputCommitter.setupOutput();
return null;
}
@@ -4034,8 +4043,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
- LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
- + vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
Task task = vertex.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -4350,10 +4357,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
int numEventsSent = events.size() - numPreRoutedEvents;
if (numEventsSent > 0) {
StringBuilder builder = new StringBuilder();
- builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
- .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
- .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
- .append(getLogIdentifier());
+ builder.append("Sending ").append(attemptID).append(" ")
+ .append(numEventsSent)
+ .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
+ .append(") total ").append(currEventCount).append(" ")
+ .append(getLogIdentifier());
LOG.info(builder.toString());
}
}
@@ -4644,9 +4652,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
for (String inputName : inputsWithInitializers) {
inputList.add(rootInputDescriptors.get(inputName));
}
- LOG.info("Vertex will initialize via inputInitializers "
- + logIdentifier + ". Starting root input initializers: "
- + inputsWithInitializers.size());
+ LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " +
+ logIdentifier);
initWaitsForRootInitializers = true;
rootInputInitializerManager.runInputInitializers(inputList);
// Send pending rootInputInitializerEvents
@@ -4730,6 +4737,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+ LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
for (RootInputLeafOutputProto input : inputs) {
addIO(input.getName());
@@ -4774,7 +4782,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
- LOG.info("setting additional outputs for vertex " + this.vertexName);
+ LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier);
this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
for (RootInputLeafOutputProto output : outputs) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index ab74382..d384aef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -127,7 +127,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerLaunchRequest event) {
- LOG.info("Launching Container with Id: " + event.getContainerId());
+ LOG.info("Launching " + event.getContainerId());
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
sendContainerLaunchFailedMsg(event.getContainerId(),
@@ -185,8 +185,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
- LOG.info("Sending a stop request to the NM for ContainerId: "
- + containerID);
+ LOG.info("Stopping " + containerID);
ContainerManagementProtocolProxyData proxy = null;
try {
@@ -353,6 +352,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
ContainerId containerID = event.getBaseOperation().getContainerId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing ContainerOperation {}", event);
+ }
Container c = getContainer(event);
switch(event.getOpType()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 04d7089..dbf8e38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -203,7 +203,9 @@ public class TaskSchedulerManager extends AbstractService implements
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
- LOG.info("Processing the event " + sEvent.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing the event " + sEvent.toString());
+ }
switch (sEvent.getType()) {
case S_TA_LAUNCH_REQUEST:
handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
@@ -219,7 +221,7 @@ public class TaskSchedulerManager extends AbstractService implements
handleTASucceeded(event);
break;
default:
- throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+ throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
}
break;
case S_CONTAINER_DEALLOCATE:
@@ -366,8 +368,8 @@ public class TaskSchedulerManager extends AbstractService implements
event);
return;
}
- LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
- + " but no locality information exists for it. Ignoring hint.");
+ LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+ + taskAttempt.getID() + " Ignoring.");
// fall through with null hosts/racks
} else {
hosts = (locationHint.getHosts() != null) ? locationHint
@@ -422,7 +424,8 @@ public class TaskSchedulerManager extends AbstractService implements
@VisibleForTesting
TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
int schedulerId) {
- LOG.info("Creating TaskScheduler: Local TaskScheduler");
+ LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}",
+ taskSchedulerContext.getCustomClusterIdentifier());
return new LocalTaskSchedulerService(taskSchedulerContext);
}
@@ -430,8 +433,8 @@ public class TaskSchedulerManager extends AbstractService implements
TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
NamedEntityDescriptor taskSchedulerDescriptor,
int schedulerId) throws TezException {
- LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
- taskSchedulerDescriptor.getClassName());
+ LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", taskSchedulerDescriptor.getEntityName(),
+ taskSchedulerDescriptor.getClassName(), taskSchedulerContext.getCustomClusterIdentifier());
return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(),
new Class[]{TaskSchedulerContext.class},
new Object[]{taskSchedulerContext});
@@ -450,8 +453,6 @@ public class TaskSchedulerManager extends AbstractService implements
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
}
- LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
- customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index a1c4753..aaa6165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -35,6 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -329,7 +330,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
"Heartbeats between preemptions should be >=1");
delayedContainerManager = new DelayedContainerManager();
- LOG.info("TaskScheduler initialized with configuration: " +
+ LOG.info("YarnTaskScheduler initialized with configuration: " +
"maxRMHeartbeatInterval: " + heartbeatIntervalMax +
", containerReuseEnabled: " + shouldReuseContainers +
", reuseRackLocal: " + reuseRackLocal +
@@ -407,8 +408,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
if (isStopStarted.get()) {
- for (ContainerStatus status : statuses) {
- LOG.info("Container " + status.getContainerId() + " is completed");
+ if (LOG.isDebugEnabled()) {
+ for (ContainerStatus status : statuses) {
+ LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
+ status);
+ }
}
return;
}
@@ -429,8 +433,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
// being released
// completion of a container we had released earlier
// an allocated container completed. notify app
- LOG.info("Released container completed:" + completedId +
- " last allocated to task: " + task);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Released container completed:" + completedId +
+ " last allocated to task: " + task);
+ }
appContainerStatus.put(task, containerStatus);
continue;
}
@@ -446,9 +452,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
if(task != null) {
// completion of a container we have allocated currently
- // an allocated container completed. notify app
- LOG.info("Allocated container completed:" + completedId +
- " last allocated to task: " + task);
+ // an allocated container completed. notify app. This will cause attempt to get killed
+ LOG.info(
+ "Allocated container completed:" + completedId + " last allocated to task: " + task);
appContainerStatus.put(task, containerStatus);
continue;
}
@@ -467,9 +473,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
@Override
public void onContainersAllocated(List<Container> containers) {
if (isStopStarted.get()) {
- for (Container container : containers) {
- LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
- releaseContainer(container.getId());
+ LOG.info("Ignoring container allocations because application is shutting down. Num " +
+ containers.size());
+ if (LOG.isDebugEnabled()) {
+ for (Container container : containers) {
+ LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
+ releaseContainer(container.getId());
+ }
}
return;
}
@@ -528,6 +538,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
// Release any unassigned containers given by the RM
+ if (containers.iterator().hasNext()) {
+ LOG.info("Releasing newly assigned containers which could not be allocated");
+ }
releaseUnassignedContainers(containers);
return assignedContainers;
@@ -581,15 +594,15 @@ public class YarnTaskSchedulerService extends TaskScheduler
boolean isNew = heldContainer.isNew();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign a delayed container"
- + ", containerId=" + heldContainer.getContainer().getId()
- + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
- + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
- + ", AMState=" + state
- + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
- + ", taskRequestsCount=" + taskRequests.size()
- + ", heldContainers=" + heldContainers.size()
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", isNew=" + isNew);
+ + ", containerId=" + heldContainer.getContainer().getId()
+ + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+ + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+ + ", AMState=" + state
+ + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+ + ", taskRequestsCount=" + taskRequests.size()
+ + ", heldContainers=" + heldContainers.size()
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ + ", isNew=" + isNew);
}
if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) {
@@ -637,7 +650,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ ", isNew=" + isNew);
releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.getContainer()));
+ Collections.singletonList((heldContainer.getContainer())));
} else {
// no outstanding work and container idle timeout not expired
if (LOG.isDebugEnabled()) {
@@ -690,7 +703,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
assignReUsedContainerWithLocation(containerToAssign,
NODE_LOCAL_ASSIGNER, assignedContainers, true);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using node"
+ LOG.debug("Failed to assign tasks to delayed container using node"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -706,7 +719,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
assignReUsedContainerWithLocation(containerToAssign,
RACK_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using rack"
+ LOG.debug("Failed to assign tasks to delayed container using rack"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -722,7 +735,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
assignReUsedContainerWithLocation(containerToAssign,
NON_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using non-local"
+ LOG.debug("Failed to assign tasks to delayed container using non-local"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -744,10 +757,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
&& idleContainerTimeoutMin != -1) {
LOG.info("Container's idle timeout expired. Releasing container"
- + ", containerId=" + heldContainer.container.getId()
- + ", containerExpiryTime="
- + heldContainer.getContainerExpiryTime()
- + ", idleTimeoutMin=" + idleContainerTimeoutMin);
+ + ", containerId=" + heldContainer.container.getId()
+ + ", containerExpiryTime="
+ + heldContainer.getContainerExpiryTime()
+ + ", idleTimeoutMin=" + idleContainerTimeoutMin);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
@@ -794,11 +807,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
if (safeToRelease &&
(!taskRequests.isEmpty() || !getContext().isSession())) {
LOG.info("Releasing held container as either there are pending but "
- + " unmatched requests or this is not a session"
- + ", containerId=" + heldContainer.container.getId()
- + ", pendingTasks=" + taskRequests.size()
- + ", isSession=" + getContext().isSession()
- + ". isNew=" + isNew);
+ + " unmatched requests or this is not a session"
+ + ", containerId=" + heldContainer.container.getId()
+ + ", pendingTasks=" + taskRequests.size()
+ + ", isSession=" + getContext().isSession()
+ + ". isNew=" + isNew);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
@@ -873,8 +886,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
// TODO this will not handle dynamic changes in resources
totalResources = Resources.clone(getAvailableResources());
LOG.info("App total resource memory: " + totalResources.getMemory() +
- " cpu: " + totalResources.getVirtualCores() +
- " taskAllocations: " + taskAllocations.size());
+ " cpu: " + totalResources.getVirtualCores() +
+ " taskAllocations: " + taskAllocations.size());
}
numHeartbeats++;
@@ -973,9 +986,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
// See if any of the delayedContainers can be used for this task.
delayedContainerManager.triggerScheduling(true);
LOG.info("Allocation request for task: " + task +
- " with request: " + request +
- " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
- " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+ " with request: " + request +
+ " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
+ " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
}
/**
@@ -1008,8 +1021,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
LOG.info("Ignoring removal of unknown task: " + task);
return false;
} else {
- LOG.info("Deallocated task: " + task + " from container: "
- + container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deallocated task: " + task + " from container: "
+ + container.getId());
+ }
if (!taskSucceeded || !shouldReuseContainers) {
if (LOG.isDebugEnabled()) {
@@ -1029,6 +1044,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
assignedContainers = assignDelayedContainer(heldContainer);
} else {
+ // this is a non standard situation
LOG.info("Skipping container after task deallocate as container is"
+ " no longer running, containerId=" + container.getId());
}
@@ -1047,8 +1063,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
public synchronized Object deallocateContainer(ContainerId containerId) {
Object task = unAssignContainer(containerId, true);
if(task != null) {
+ // non-standard case for the app layer to deallocate container
LOG.info("Deallocated container: " + containerId +
- " from task: " + task);
+ " from task: " + task);
return task;
}
@@ -1058,9 +1075,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
@Override
public synchronized void initiateStop() {
- LOG.info("Initiate stop to YarnTaskScheduler");
+ LOG.info("Initiating stop of YarnTaskScheduler");
// release held containers
- LOG.info("Release held containers");
+ LOG.info("Releasing held containers");
isStopStarted.set(true);
// Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
// because method releaseContainer will change heldContainers.
@@ -1073,7 +1090,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
// remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
- LOG.info("Remove all the taskRequests");
+ LOG.info("Removing all pending taskRequests");
// Create a new list for tasks to avoid ConcurrentModificationException
List<Object> tasks = new ArrayList<Object>(taskRequests.size());
for (Object task : taskRequests.keySet()) {
@@ -1634,8 +1651,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
private void releaseUnassignedContainers(Iterable<Container> containers) {
for (Container container : containers) {
- LOG.info("Releasing unused container: "
- + container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing unused container: " + container.getId());
+ }
releaseContainer(container.getId());
}
}
@@ -1704,19 +1722,17 @@ public class YarnTaskSchedulerService extends TaskScheduler
Object task = getTask(assigned);
assert task != null;
- LOG.info("Assigning container to task"
- + ", container=" + container
+ LOG.info("Assigning container to task: "
+ + "containerId=" + container.getId()
+ ", task=" + task
- + ", containerHost=" + container.getNodeId().getHost()
+ + ", containerHost=" + container.getNodeId()
+ + ", containerPriority= " + container.getPriority()
+ + ", containerResources=" + container.getResource()
+ ", localityMatchType=" + locality
+ ", matchedLocation=" + matchedLocation
+ ", honorLocalityFlags=" + honorLocalityFlags
- + ", reusedContainer="
- + containerAssignments.containsKey(container.getId())
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", containerResourceMemory=" + container.getResource().getMemory()
- + ", containerResourceVCores="
- + container.getResource().getVirtualCores());
+ + ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
assignContainer(task, container, assigned);
}
@@ -1904,6 +1920,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
heldContainers.get(delayedContainer.getContainer().getId())) {
assignedContainers = assignDelayedContainer(delayedContainer);
} else {
+ // non standard scenario
LOG.info("Skipping delayed container as container is no longer"
+ " running, containerId="
+ delayedContainer.getContainer().getId());
@@ -1958,9 +1975,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
HeldContainer delayedContainer = iter.next();
if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
// this container is no longer held by us
+ // non standard scenario
LOG.info("AssignAll - Skipping delayed container as container is no longer"
- + " running, containerId="
- + delayedContainer.getContainer().getId());
+ + " running, containerId="
+ + delayedContainer.getContainer().getId());
iter.remove();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fa56..11b5006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -113,8 +113,10 @@ public class AMContainerHelpers {
// correctly, even though they may not be used by all tasks which will run
// on this container.
- LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
- + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC");
+ }
containerCredentials.addAll(credentials);
DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
@@ -123,7 +125,9 @@ public class AMContainerHelpers {
containerTokens_dob.getLength());
// Add shuffle token
- LOG.info("Putting shuffle token in serviceData");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Putting shuffle token in serviceData in common CLC");
+ }
serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 69c21d4..d37d106 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -421,9 +421,11 @@ public class AMContainerImpl implements AMContainer {
// TODO Can't set state to COMPLETED. Add a default error state.
}
if (oldState != getState()) {
- LOG.info("AMContainer " + this.containerId + " transitioned from "
- + oldState + " to " + getState()
- + " via event " + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AMContainer " + this.containerId + " transitioned from "
+ + oldState + " to " + getState()
+ + " via event " + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -474,8 +476,10 @@ public class AMContainerImpl implements AMContainer {
// task is not told to die since the TAL does not know about the container.
container.registerWithTAListener();
container.sendStartRequestToNM(clc);
- LOG.info("Sending Launch Request for Container with id: " +
- container.container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Launch Request for Container with id: " +
+ container.container.getId());
+ }
}
}
@@ -533,7 +537,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.deAllocate();
- LOG.info(
+ LOG.warn(
"Unexpected event type: " + cEvent.getType() + " while in state: " +
container.getState() + ". Event: " + cEvent);
@@ -597,8 +601,6 @@ public class AMContainerImpl implements AMContainer {
}
}
- LOG.info("Assigned taskAttempt + [" + container.currentAttempt +
- "] to container: [" + container.getContainerId() + "]");
AMContainerTask amContainerTask = new AMContainerTask(
event.getRemoteTaskSpec(), container.additionalLocalResources,
container.credentialsChanged ? container.credentials : null, container.credentialsChanged,
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index 3264708..b1c81af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -95,6 +95,8 @@ public class PerSourceNodeTracker {
AMNode amNode = nodeMap.get(nodeId);
if (amNode == null) {
LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+ // This implies that the node exists on the cluster, but is not running a container for
+ // this application.
} else {
amNode.handle(rEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 9e275a2..e17a4d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing HistoryEventHandler");
-
this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
@@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService {
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+ LOG.info("Initializing HistoryEventHandler with"
+ + "recoveryEnabled=" + recoveryEnabled
+ + ", historyServiceClassName=" + historyServiceClassName);
+
historyLoggingService =
ReflectionUtils.createClazzInstance(historyServiceClassName);
historyLoggingService.setAppContext(context);
@@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService {
addService(recoveryService);
}
super.serviceInit(conf);
+
}
@Override
public void serviceStart() throws Exception {
- LOG.info("Starting HistoryEventHandler");
super.serviceStart();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 9f24151..7d83db2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
@Override
public String toString() {
+ String counterStr = "";
+ if (state != TaskAttemptState.SUCCEEDED) {
+ counterStr = ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ }
return "vertexName=" + vertexName
+ ", taskAttemptId=" + taskAttemptId
+ ", creationTime=" + creationTime
@@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
- + ", lastDataEventSourceTA=" +
- ((dataEvents==null) ? 0:dataEvents.size())
- + ", counters=" + (tezCounters == null ? "null" :
- tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ + counterStr;
}
public TezTaskAttemptID getTaskAttemptID() {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a58b49e..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
+ ", taskAttemptId=" + taskAttemptId
+ ", startTime=" + launchTime
+ ", containerId=" + containerId
- + ", nodeId=" + nodeId
- + ", inProgressLogs=" + inProgressLogsUrl
- + ", completedLogs=" + completedLogsUrl;
+ + ", nodeId=" + nodeId;
}
public TezTaskAttemptID getTaskAttemptID() {
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8852e02..4372d8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
if (loggingDisabled) {
return;
}
- LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ }
try {
try {
JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d870645..2fe0e6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService {
@Override
public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing RecoveryService");
recoveryPath = appContext.getCurrentRecoveryDir();
recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
@@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService {
drainEventsFlag = conf.getBoolean(
TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
+
+ LOG.info("RecoveryService initialized with "
+ + "recoveryPath=" + recoveryPath
+ + ", bufferSize(bytes)=" + bufferSize
+ + ", flushInterval(s)=" + flushInterval
+ + ", maxUnflushedEvents=" + maxUnflushedEvents);
}
@Override
public void serviceStart() {
- LOG.info("Starting RecoveryService");
lastFlushTime = appContext.getClock().getTime();
eventHandlingThread = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 7a2aeab..c53994e 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
#
# Event Counter Appender
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 69237d4..1b66c8e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter {
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
- LOG.info("Using mapred newApiCommitter.");
}
+ LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
+ " using " + (newApiCommitter ? "new" : "old") + "mapred API");
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbc7748..b93e4ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer {
@Override
public List<Event> initialize() throws Exception {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
MRInputUserPayloadProto userPayloadProto = MRInputHelpers
.parseMRInputPayload(getContext().getInputUserPayload());
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
- if (LOG.isDebugEnabled()) {
- sw.reset().start();
- }
+ sw.reset().start();
Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
sendSerializedEvents = conf.getBoolean(
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
- LOG.info("Emitting serialized splits: " + sendSerializedEvents);
+
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
+ LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " +
+ getContext().getInputName());
LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
}
- if (LOG.isDebugEnabled()) {
- sw.reset().start();
- }
+ sw.reset().start();
int totalResource = getContext().getTotalAvailableResource().getMemory();
int taskResource = getContext().getVertexTaskResource().getMemory();
@@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer {
int numTasks = (int)((totalResource*waves)/taskResource);
+
+
+ boolean groupSplits = userPayloadProto.getGroupingEnabled();
LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
+ " tasks. Headroom: " + totalResource + " Task Resource: "
- + taskResource + " waves: " + waves);
+ + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits);
// Read all credentials into the credentials instance stored in JobConf.
JobConf jobConf = new JobConf(conf);
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
InputSplitInfoMem inputSplitInfo = null;
- boolean groupSplits = userPayloadProto.getGroupingEnabled();
+
if (groupSplits) {
- LOG.info("Grouping input splits");
inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
} else {
inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index e6b70d2..28d108e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer {
@Override
public List<Event> initialize() throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
MRInputUserPayloadProto userPayloadProto = MRInputHelpers
.parseMRInputPayload(getContext().getInputUserPayload());
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 7f5e0e3..30e4a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -284,21 +284,27 @@ public class MRInputHelpers {
InputSplitInfoMem splitInfoMem = null;
JobConf jobConf = new JobConf(conf);
if (jobConf.getUseNewMapper()) {
- LOG.info("Generating mapreduce api input splits");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating mapreduce api input splits");
+ }
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(job, groupSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, job.getCredentials(), job.getConfiguration());
} else {
- LOG.info("Generating mapred api input splits");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating mapred api input splits");
+ }
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, groupSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, jobConf.getCredentials(), jobConf);
}
- LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
- + splitInfoMem.getSplitsProto().getSerializedSize());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ + splitInfoMem.getSplitsProto().getSerializedSize());
+ }
return splitInfoMem;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 720af50..80828d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
if (useNewApi) {
oldPartitioner = null;
if (partitions > 1) {
+ Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz =
+ (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+ .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
+ LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName());
newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
- .newInstance(
- (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
- .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
- org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
+ .newInstance(clazz, conf);
} else {
newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
@Override
@@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
} else {
newPartitioner = null;
if (partitions > 1) {
- oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+ Class<? extends org.apache.hadoop.mapred.Partitioner> clazz =
(Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
- "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class),
- new JobConf(conf));
+ "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class);
+ LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName());
+ oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+ clazz, new JobConf(conf));
} else {
oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index d0e935f..6ea21e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
+ TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
return;
}
- LOG.info("Initializing ATSService");
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
@@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
}
sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
- LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+ LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+ + "maxEventsPerBatch=" + maxEventsPerBatch
+ + ", maxPollingTime(ms)=" + maxPollingTimeMillis
+ + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown
+ + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
+
try {
historyACLPolicyManager = ReflectionUtils.createClazzInstance(
atsHistoryACLManagerClassName);
@@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (!historyLoggingEnabled || timelineClient == null) {
return;
}
- LOG.info("Starting ATSService");
timelineClient.start();
eventHandlingThread = new Thread(new Runnable() {