You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/17 00:20:19 UTC
[1/2] tez git commit: TEZ-2830. Backport TEZ-2774 to branch-0.7.
(sseth)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 fd7c2fc13 -> e15faa56c
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index fb8b530..92035e1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -202,11 +202,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
* @throws Exception
*/
public void initialize() throws Exception {
- LOG.info("Initializing LogicalProcessorIORuntimeTask");
Preconditions.checkState(this.state.get() == State.NEW, "Already initialized");
this.state.set(State.INITED);
- LOG.info("Creating processor" + ", processorClassName=" + processorDescriptor.getClassName());
this.processorContext = createProcessorContext();
this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);
@@ -406,7 +404,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
- LOG.info("Initializing Input using InputSpec: " + inputSpec);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing Input using InputSpec: " + inputSpec);
+ }
String edgeName = inputSpec.getSourceVertexName();
InputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex);
LogicalInput input = createInput(inputSpec, inputContext);
@@ -414,13 +414,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputsMap.put(edgeName, input);
inputContextMap.put(edgeName, inputContext);
- LOG.info("Initializing Input with src edge: " + edgeName);
List<Event> events = ((InputFrameworkInterface)input).initialize();
sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
taskSpec.getTaskAttemptID());
initializedInputs.put(edgeName, input);
- LOG.info("Initialized Input with src edge: " + edgeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized Input with src edge: " + edgeName);
+ }
return null;
}
}
@@ -436,7 +437,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
- LOG.info("Starting Input with src edge: " + srcVertexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting Input with src edge: " + srcVertexName);
+ }
+
input.start();
LOG.info("Started Input with src edge: " + srcVertexName);
return null;
@@ -455,7 +459,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
protected Void callInternal() throws Exception {
- LOG.info("Initializing Output using OutputSpec: " + outputSpec);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing Output using OutputSpec: " + outputSpec);
+ }
String edgeName = outputSpec.getDestinationVertexName();
OutputContext outputContext = createOutputContext(outputSpec, outputIndex);
LogicalOutput output = createOutput(outputSpec, outputContext);
@@ -463,13 +469,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
outputsMap.put(edgeName, output);
outputContextMap.put(edgeName, outputContext);
- LOG.info("Initializing Output with dest edge: " + edgeName);
List<Event> events = ((OutputFrameworkInterface)output).initialize();
sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
outputContext.getTaskVertexName(),
outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
initializedOutputs.put(edgeName, output);
- LOG.info("Initialized Output with dest edge: " + edgeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized Output with dest edge: " + edgeName);
+ }
return null;
}
}
@@ -486,7 +493,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
for (GroupInputSpec groupInputSpec : groupInputSpecs) {
- LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
+ }
MergedInputContext mergedInputContext =
new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
@@ -505,11 +514,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
private void initializeLogicalIOProcessor() throws Exception {
- LOG.info("Initializing processor" + ", processorClassName="
- + processorDescriptor.getClassName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing processor" + ", processorClassName="
+ + processorDescriptor.getClassName());
+ }
processor.initialize();
- LOG.info("Initialized processor" + ", processorClassName="
- + processorDescriptor.getClassName());
+ LOG.info("Initialized processor");
}
private InputContext createInputContext(Map<String, LogicalInput> inputMap,
@@ -556,7 +566,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException {
- LOG.info("Creating Input");
InputDescriptor inputDesc = inputSpec.getInputDescriptor();
Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
new Class[]{InputContext.class, Integer.TYPE},
@@ -579,7 +588,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException {
- LOG.info("Creating Output");
OutputDescriptor outputDesc = outputSpec.getOutputDescriptor();
Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(),
new Class[]{OutputContext.class, Integer.TYPE},
@@ -704,7 +712,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
if (e == null) {
continue;
}
- // TODO TODONEWTEZ
if (!handleEvent(e)) {
LOG.warn("Stopping Event Router thread as failed to handle"
+ " event: " + e);
@@ -798,7 +805,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
"Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
srcVertexName, e.getClass().getName(), e.getMessage());
} finally {
- LOG.info("Close input for vertex={}, sourceVertex={}", processor
+ LOG.info("Closed input for vertex={}, sourceVertex={}", processor
.getContext().getTaskVertexName(), srcVertexName);
}
}
@@ -816,7 +823,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
"Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
destVertexName, e.getClass().getName(), e.getMessage());
} finally {
- LOG.info("Close input for vertex={}, sourceVertex={}", processor
+ LOG.info("Closed input for vertex={}, sourceVertex={}", processor
.getContext().getTaskVertexName(), destVertexName);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 8d6466a..4431150 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -176,6 +176,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
super.close();
this.userPayload = null;
this.inputReadyTracker = null;
- LOG.info("Cleared TezInputContextImpl related information");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleared TezInputContextImpl related information");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 71e96db..1e5b6a5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -156,6 +156,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl
public void close() throws IOException {
super.close();
this.userPayload = null;
- LOG.info("Cleared TezOutputContextImpl related information");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleared TezOutputContextImpl related information");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a191ae8..6dc30ff 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -121,7 +121,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
super.close();
this.userPayload = null;
this.inputReadyTracker = null;
- LOG.info("Cleared TezProcessorContextImpl related information");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleared TezProcessorContextImpl related information");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index 2622b1f..c822357 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -61,6 +61,7 @@ public class MemoryDistributor {
private long totalJvmMemory;
private final boolean isEnabled;
+ private final String allocatorClassName;
private final Set<TaskContext> dupSet = Collections
.newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>());
private final List<RequestorInfo> requestList;
@@ -77,7 +78,13 @@ public class MemoryDistributor {
this.conf = conf;
isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
-
+
+ if (isEnabled) {
+ allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT);
+ } else {
+ allocatorClassName = null;
+ }
this.numTotalInputs = numTotalInputs;
this.numTotalOutputs = numTotalOutputs;
@@ -85,7 +92,8 @@ public class MemoryDistributor {
this.requestList = Collections.synchronizedList(new LinkedList<RequestorInfo>());
LOG.info("InitialMemoryDistributor (isEnabled=" + isEnabled + ") invoked with: numInputs="
+ numTotalInputs + ", numOutputs=" + numTotalOutputs
- + ", JVM.maxFree=" + totalJvmMemory);
+ + ", JVM.maxFree=" + totalJvmMemory
+ + ", allocatorClassName=" + allocatorClassName);
}
@@ -97,7 +105,7 @@ public class MemoryDistributor {
TaskContext taskContext, EntityDescriptor<?> descriptor) {
registerRequest(requestSize, callback, taskContext, descriptor);
}
-
+
/**
* Used by the Tez framework to distribute initial memory after components
* have made their initial requests.
@@ -106,6 +114,9 @@ public class MemoryDistributor {
public void makeInitialAllocations() throws TezException {
Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory");
Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory");
+
+ logInitialRequests(requestList);
+
Iterable<InitialMemoryRequestContext> requestContexts = Iterables.transform(requestList,
new Function<RequestorInfo, InitialMemoryRequestContext>() {
public InitialMemoryRequestContext apply(RequestorInfo requestInfo) {
@@ -121,14 +132,12 @@ public class MemoryDistributor {
}
});
} else {
- String allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
- TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT);
- LOG.info("Using Allocator class: " + allocatorClassName);
InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName);
allocator.setConf(conf);
allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs,
Iterables.unmodifiableIterable(requestContexts));
validateAllocations(allocations, requestList.size());
+ logFinalAllocations(allocations, requestList);
}
// Making the callbacks directly for now, instead of spawning threads. The
@@ -137,14 +146,18 @@ public class MemoryDistributor {
Iterator<Long> allocatedIter = allocations.iterator();
for (RequestorInfo rInfo : requestList) {
long allocated = allocatedIter.next();
- LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", "
- + rInfo.getRequestContext().getComponentVertexName() + ", "
- + rInfo.getRequestContext().getComponentClassName() + ": requested="
- + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", "
+ + rInfo.getRequestContext().getComponentVertexName() + ", "
+ + rInfo.getRequestContext().getComponentClassName() + ": requested="
+ + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated);
+ }
rInfo.getCallback().memoryAssigned(allocated);
}
}
+
+
/**
* Allow tests to set memory.
* @param size
@@ -233,8 +246,6 @@ public class MemoryDistributor {
this.requestContext = new InitialMemoryRequestContext(requestSize, descriptor.getClassName(),
type, componentVertexName);
this.callback = callback;
- LOG.info("Received request: " + requestSize + ", type: " + type + ", componentVertexName: "
- + componentVertexName);
}
public MemoryUpdateCallback getCallback() {
@@ -246,4 +257,45 @@ public class MemoryDistributor {
}
}
+
+ private void logInitialRequests(List<RequestorInfo> initialRequests) {
+ if (initialRequests != null && !initialRequests.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < initialRequests.size(); i++) {
+ InitialMemoryRequestContext context = initialRequests.get(i).getRequestContext();
+ sb.append("[");
+ sb.append(context.getComponentVertexName()).append(":");
+ sb.append(context.getComponentType()).append(":");
+ sb.append(context.getRequestedSize()).append(":").append(context.getComponentClassName());
+ sb.append("]");
+ if (i < initialRequests.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ LOG.info("InitialRequests=" + sb.toString());
+ }
+ }
+
+ private void logFinalAllocations(Iterable<Long> allocations, List<RequestorInfo> requestList) {
+ if (requestList != null && !requestList.isEmpty()) {
+ Iterator<Long> allocatedIter = allocations.iterator();
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0 ; i < requestList.size() ; i++) {
+ long allocated = allocatedIter.next();
+ InitialMemoryRequestContext context = requestList.get(i).getRequestContext();
+ sb.append("[");
+ sb.append(context.getComponentVertexName()).append(":");
+ sb.append(context.getComponentClassName()).append(":");
+ sb.append(context.getComponentType()).append(":");
+ sb.append(context.getRequestedSize()).append(":").append(allocated);
+ sb.append("]");
+ if (i < requestList.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ LOG.info("Allocations=" + sb.toString());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
index ebb94c6..2472c51 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -44,7 +44,7 @@ import org.apache.tez.dag.api.TezConfiguration;
public class TaskCounterUpdater {
private static final Logger LOG = LoggerFactory.getLogger(TaskCounterUpdater.class);
-
+
private final TezCounters tezCounters;
private final Configuration conf;
@@ -149,6 +149,6 @@ public class TaskCounterUpdater {
pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, conf);
- LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+ LOG.info("Using ResourceCalculatorProcessTree : " + clazz.getName());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index 8ee30c5..0ece227 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -40,7 +40,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
private final TezTaskUmbilicalProtocol umbilical;
private final ContainerContext containerContext;
private final int getTaskMaxSleepTime;
- private final long LOG_INTERVAL = 2000l;
+ private final long LOG_INTERVAL = 30000l;
private long nextGetTaskPrintTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 062b497..7f03992 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -287,10 +287,10 @@ public class TezChild {
Preconditions.checkState(!containerTask.shouldDie());
Preconditions.checkState(containerTask.getTaskSpec() != null);
if (containerTask.haveCredentialsChanged()) {
- LOG.info("Refreshing UGI since Credentials have changed");
Credentials taskCreds = containerTask.getCredentials();
if (taskCreds != null) {
- LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+ LOG.info("Refreshing UGI since Credentials have changed. Credentials : #Tokens=" +
+ taskCreds.numberOfTokens() + ", #SecretKeys="
+ taskCreds.numberOfSecretKeys());
childUGI = UserGroupInformation.createRemoteUser(user);
childUGI.addCredentials(containerTask.getCredentials());
@@ -315,20 +315,20 @@ public class TezChild {
LOG.debug("Additional Resources added to container: " + additionalResources);
}
- LOG.info("Localizing additional local resources for Task : " + additionalResources);
- List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
- Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
- @Override
- public URI apply(TezLocalResource input) {
- return input.getUri();
- }
- }), defaultConf, workingDir);
- RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
- LOG.info("Done localizing additional resources");
- final TaskSpec taskSpec = containerTask.getTaskSpec();
- if (LOG.isDebugEnabled()) {
- LOG.debug("New container task context:" + taskSpec.toString());
+ if (additionalResources != null && !additionalResources.isEmpty()) {
+ LOG.info("Localizing additional local resources for Task : " + additionalResources);
+
+ List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+ Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+ @Override
+ public URI apply(TezLocalResource input) {
+ return input.getUri();
+ }
+ }), defaultConf, workingDir);
+ RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+ LOG.info("Done localizing additional resources");
}
}
@@ -456,7 +456,8 @@ public class TezChild {
final Configuration defaultConf = new Configuration();
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- LOG.info("TezChild starting");
+ final String pid = System.getenv().get("JVM_PID");
+
assert args.length == 5;
String host = args[0];
@@ -466,8 +467,7 @@ public class TezChild {
final int attemptNumber = Integer.parseInt(args[4]);
final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
.name()));
- final String pid = System.getenv().get("JVM_PID");
- LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
+ LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier);
if (LOG.isDebugEnabled()) {
LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+ " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index ad0eaf9..69436ba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -62,7 +62,6 @@ public class TezRuntimeUtils {
Class<? extends Combiner> clazz;
String className = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
if (className == null) {
- LOG.info("No combiner specified via " + TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS + ". Combiner will not be used");
return null;
}
LOG.info("Using Combiner class: " + className);
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index b70c9d7..8477300 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -168,7 +168,6 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
Request request = new Request(context.getComponentClassName(), context.getRequestedSize(),
requestType, typeScaleFactor);
requests.add(request);
- LOG.info("ScaleFactor: " + typeScaleFactor + ", for type: " + requestType);
numRequestsScaled += typeScaleFactor;
}
@@ -194,7 +193,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT;
} else {
requestType = RequestType.OTHER;
- LOG.info("Falling back to RequestType.OTHER for class: " + className);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Falling back to RequestType.OTHER for class: " + className);
+ }
}
return requestType;
}
@@ -219,6 +220,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
}
}
+ StringBuilder sb = new StringBuilder();
Set<RequestType> seenTypes = new HashSet<RequestType>();
for (String ratio : ratios) {
@@ -232,7 +234,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
}
Preconditions.checkState(ratioVal >= 0, "Ratio must be >= 0");
typeScaleMap.put(requestType, ratioVal);
+ sb.append("[").append(requestType).append(":").append(ratioVal).append("]");
}
+ LOG.info("ScaleRatiosUsed=" + sb.toString());
}
private double computeReservedFraction(int numTotalRequests) {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
index 6b8bd0d..bf3e9db 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -151,9 +151,7 @@ public class RPCLoadGen extends TezExampleBase {
random.nextBytes(diskPayload);
fs = FileSystem.get(conf);
resourcePath = new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME);
- System.err.println("ZZZ: HDFSPath: " + resourcePath);
resourcePath = fs.makeQualified(resourcePath);
- System.err.println("ZZZ: HDFSPathResolved: " + resourcePath);
FSDataOutputStream dataOut = fs.create(resourcePath, true);
dataOut.write(diskPayload);
dataOut.close();
[2/2] tez git commit: TEZ-2830. Backport TEZ-2774 to branch-0.7.
(sseth)
Posted by ss...@apache.org.
TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e15faa56
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e15faa56
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e15faa56
Branch: refs/heads/branch-0.7
Commit: e15faa56c5a1a9ab678fc15583189be8882228f1
Parents: fd7c2fc
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 16 15:20:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 16 15:20:01 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/AsyncDispatcher.java | 14 +-
.../tez/common/AsyncDispatcherConcurrent.java | 9 +-
.../org/apache/tez/common/TezUtilsInternal.java | 14 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 27 +++-
.../app/dag/RootInputInitializerManager.java | 2 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 34 ++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 99 ++++++------
.../dag/app/launcher/ContainerLauncherImpl.java | 9 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 10 +-
.../dag/app/rm/YarnTaskSchedulerService.java | 155 +++++++++++--------
.../app/rm/container/AMContainerHelpers.java | 10 +-
.../dag/app/rm/container/AMContainerImpl.java | 18 ++-
.../tez/dag/app/rm/node/AMNodeTracker.java | 2 +
.../tez/dag/history/HistoryEventHandler.java | 8 +-
.../events/TaskAttemptFinishedEvent.java | 12 +-
.../history/events/TaskAttemptStartedEvent.java | 4 +-
.../impl/SimpleHistoryLoggingService.java | 4 +-
.../dag/history/recovery/RecoveryService.java | 8 +-
.../resources/tez-container-log4j.properties | 2 +-
.../mapreduce/committer/MROutputCommitter.java | 3 +-
.../common/MRInputAMSplitGenerator.java | 31 ++--
.../common/MRInputSplitDistributor.java | 7 +-
.../tez/mapreduce/hadoop/MRInputHelpers.java | 14 +-
.../tez/mapreduce/partition/MRPartitioner.java | 18 ++-
.../logging/ats/ATSHistoryLoggingService.java | 9 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 45 +++---
.../runtime/api/impl/TezInputContextImpl.java | 4 +-
.../runtime/api/impl/TezOutputContextImpl.java | 4 +-
.../api/impl/TezProcessorContextImpl.java | 4 +-
.../common/resources/MemoryDistributor.java | 76 +++++++--
.../tez/runtime/metrics/TaskCounterUpdater.java | 4 +-
.../tez/runtime/task/ContainerReporter.java | 2 +-
.../org/apache/tez/runtime/task/TezChild.java | 36 ++---
.../runtime/library/common/TezRuntimeUtils.java | 1 -
.../WeightedScalingMemoryDistributor.java | 8 +-
.../tez/mapreduce/examples/RPCLoadGen.java | 2 -
39 files changed, 437 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c8bde4..3e56d6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2830. Backport TEZ-2774 to branch-0.7. Improvements to logging in the AM and part of the runtime.
TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 4319f4f..159ccd9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
@Override
protected void serviceStart() throws Exception {
eventHandlingThread = new Thread(createThread());
- eventHandlingThread.setName("Dispatcher thread: " + name);
+ eventHandlingThread.setName("Dispatcher thread {" + name + "}");
eventHandlingThread.start();
//start all the components
@@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
- Preconditions.checkState(concurrentDispatcher == null,
+ Preconditions.checkState(concurrentDispatcher == null,
"Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
}
@@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
@@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
@@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
- + handler.getClass());
+ LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+ + handler.getClass());
dispatcher.register(eventType, handler);
concurrentEventDispatchers.put(eventType, dispatcher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index d19bf9e..321ea8b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
@Override
protected void serviceStart() throws Exception {
execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+ .setNameFormat("Dispatcher {" + this.name + "} #%d").build());
for (int i=0; i<numThreads; ++i) {
eventQueues.add(new LinkedBlockingQueue<Event>());
}
@@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
- Preconditions.checkState(registeredDispatcher == null,
+ Preconditions.checkState(registeredDispatcher == null,
"Multiple dispatchers cannot be registered for: " + eventType.getName());
}
@@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
- LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+ LOG.info(
+ "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
@@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
/* check to see if we have a listener registered */
checkForExistingDispatchers(true, eventType);
LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
- + handler.getClass());
+ + handler.getClass());
dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9c78377..55e1e6e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -81,13 +81,10 @@ public class TezUtilsInternal {
public static byte[] compressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
byte[] compressed = compressBytesInflateDeflate(inBytes);
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+ ", CompressTime: " + sw.elapsedMillis());
}
@@ -95,13 +92,10 @@ public class TezUtilsInternal {
}
public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+ ", UncompressTimeTaken: " + sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 49ba802..f2194fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -435,7 +435,6 @@ public class DAGAppMaster extends AbstractService {
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
- LOG.info("Adding session token to jobTokenSecretManager for application");
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);
@@ -461,8 +460,11 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
- if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
- TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+ boolean useConcurrentDispatcher =
+ conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+ TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
+ LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
+ if (!useConcurrentDispatcher) {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
} else {
@@ -522,7 +524,7 @@ public class DAGAppMaster extends AbstractService {
currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
appAttemptID.getAttemptId());
if (LOG.isDebugEnabled()) {
- LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+ LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
+ " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+ " recoveryAttemptDir :" + currentRecoveryDataDir);
}
@@ -888,7 +890,7 @@ public class DAGAppMaster extends AbstractService {
try {
if (LOG.isDebugEnabled()) {
- LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB).toString());
}
@@ -2027,6 +2029,7 @@ public class DAGAppMaster extends AbstractService {
public static void main(String[] args) {
try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ final String pid = System.getenv().get("JVM_PID");
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
@@ -2066,6 +2069,18 @@ public class DAGAppMaster extends AbstractService {
false, "Run Tez Application Master in Session mode");
CommandLine cliParser = new GnuParser().parse(opts, args);
+ boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+ LOG.info("Creating DAGAppMaster for "
+ + "applicationId=" + applicationAttemptId.getApplicationId()
+ + ", attemptNum=" + applicationAttemptId.getAttemptId()
+ + ", AMContainerId=" + containerId
+ + ", jvmPid=" + pid
+ + ", userFromEnv=" + jobUserName
+ + ", cliSessionOption=" + sessionModeCliOption
+ + ", pwd=" + System.getenv(Environment.PWD.name())
+ + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name())
+ + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name()));
// TODO Does this really need to be a YarnConfiguration ?
Configuration conf = new Configuration(new YarnConfiguration());
@@ -2077,7 +2092,7 @@ public class DAGAppMaster extends AbstractService {
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime,
- cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION),
+ sessionModeCliOption,
System.getenv(Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4a8a286..13128f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -100,7 +100,7 @@ public class RootInputInitializerManager {
this.vertex = vertex;
this.eventHandler = appContext.getEventHandler();
this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+ .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build());
this.executor = MoreExecutors.listeningDecorator(rawExecutor);
this.dagUgi = dagUgi;
this.entityStateTracker = stateTracker;
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f03dcd1..88dcc8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -181,4 +181,6 @@ public interface Vertex extends Comparable<Vertex> {
public int getKilledTaskAttemptCount();
public Configuration getConf();
+
+ public boolean isSpeculationEnabled();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bdc0207..f63f461 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -785,10 +785,12 @@ public class TaskAttemptImpl implements TaskAttempt,
);
}
if (oldState != getInternalState()) {
- LOG.info(attemptId + " TaskAttempt Transitioned from "
- + oldState + " to "
- + getInternalState() + " due to event "
- + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(attemptId + " TaskAttempt Transitioned from "
+ + oldState + " to "
+ + getInternalState() + " due to event "
+ + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -1118,7 +1120,9 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskSpec remoteTaskSpec;
try {
remoteTaskSpec = ta.createRemoteTaskSpec();
- LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
+ }
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
LOG.error(msg, e);
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e6027f5..46215d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -501,7 +501,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
int toEventId = actualMax + fromEventId;
events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
- + "-" + toEventId + ")");
+ + "-" + toEventId + ").");
// currently not modifying the events so that we dont have to create
// copies of events. e.g. if we have to set taskAttemptId into the TezEvent
// destination metadata then we will need to create a copy of the TezEvent
@@ -760,12 +760,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public boolean canCommit(TezTaskAttemptID taskAttemptID) {
writeLock.lock();
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit go/no-go request from " + taskAttemptID);
+ }
TaskState state = getState();
if (state == TaskState.SCHEDULED) {
// the actual running task ran and is done and asking for commit. we are still stuck
// in the scheduled state which indicates a backlog in event processing. lets wait for the
// backlog to clear. returning false will make the attempt come back to us.
- LOG.debug("Event processing delay. "
+ LOG.info(
+ "Event processing delay. "
+ "Attempt committing before state machine transitioned to running : Task {}", taskId);
return false;
}
@@ -796,7 +800,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
} else {
if (commitAttempt.equals(taskAttemptID)) {
- LOG.info(taskAttemptID + " given a go for committing the task output.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(taskAttemptID + " already given a go for committing the task output.");
+ }
return true;
}
// Don't think this can be a pluggable decision, so simply raise an
@@ -804,9 +810,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Wait for commit attempt to succeed. Dont kill this. If commit
// attempt fails then choose a different committer. When commit attempt
// succeeds then this and others will be killed
- LOG.info(commitAttempt
- + " is current committer. Commit waiting for: "
- + taskAttemptID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(commitAttempt + " is current committer. Commit waiting for: " + taskAttemptID);
+ }
return false;
}
@@ -814,7 +820,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
writeLock.unlock();
}
}
-
+
TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
@@ -899,9 +905,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
internalError(event.getType());
}
if (oldState != getInternalState()) {
- LOG.info(taskId + " Task Transitioned from " + oldState + " to "
- + getInternalState() + " due to event "
- + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+ + getInternalState() + " due to event "
+ + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -1112,7 +1120,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// other reasons.
!attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
- task.successfulAttempt + " has succeeded");
+ task.successfulAttempt + " has succeeded");
String diagnostics = null;
TaskAttemptTerminationCause errCause = null;
if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
@@ -1469,7 +1477,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
} else {
// nothing to do
LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
- task.successfulAttempt + " is already successful");
+ task.successfulAttempt + " is already successful");
return TaskStateInternal.SUCCEEDED;
}
}
@@ -1512,7 +1520,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
- LOG.info("Removing commit attempt: " + commitAttempt);
+ LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed");
commitAttempt = null;
}
if (attempt != null && !attempt.isFinished()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index daeae3f..4a8309e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -877,6 +877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
+ this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
this.taskAttemptListener = taskAttemptListener;
this.taskHeartbeatHandler = thh;
@@ -935,6 +936,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.containerContext = new ContainerContext(this.localResources,
appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
+ LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
if (vertexPlan.getInputsCount() > 0) {
setAdditionalInputs(vertexPlan.getInputsList());
@@ -957,7 +959,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
}
- logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -971,7 +973,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return vertexConf;
}
- private boolean isSpeculationEnabled() {
+ @Override
+ public boolean isSpeculationEnabled() {
return isSpeculationEnabled;
}
@@ -2005,29 +2008,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
+ private static String constructCheckTasksForCompletionLog(VertexImpl vertex) {
+ String logLine = vertex.logIdentifier
+ + ", tasks=" + vertex.numTasks
+ + ", failed=" + vertex.failedTaskCount
+ + ", killed=" + vertex.killedTaskCount
+ + ", success=" + vertex.succeededTaskCount
+ + ", completed=" + vertex.completedTaskCount
+ + ", commits=" + vertex.commitFutures.size()
+ + ", err=" + vertex.terminationCause;
+ return logLine;
+ }
+
// triggered by task_complete
static VertexState checkTasksForCompletion(final VertexImpl vertex) {
-
- LOG.info("Checking tasks for vertex completion for "
- + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", commitInProgress=" + vertex.commitFutures.size()
- + ", terminationCause=" + vertex.terminationCause);
-
+ // this log helps quickly count the completion count for a vertex.
+ // grepping and counting for attempts and handling re-tries is time consuming
+ LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex));
//check for vertex failure first
if (vertex.completedTaskCount > vertex.tasks.size()) {
LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
- + " for vertex " + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", terminationCause=" + vertex.terminationCause);
+ + constructCheckTasksForCompletionLog(vertex));
}
if (vertex.completedTaskCount == vertex.tasks.size()) {
@@ -2036,7 +2037,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//Only succeed if tasks complete successfully and no terminationCause is registered.
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
- LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+ LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// start commit if there're commits or just finish if no commits
return commitOrFinish(vertex);
@@ -2054,16 +2055,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//triggered by commit_complete
static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
- LOG.info("Checking commits for vertex completion for "
- + vertex.logIdentifier
- + ", numTasks=" + vertex.numTasks
- + ", failedTaskCount=" + vertex.failedTaskCount
- + ", killedTaskCount=" + vertex.killedTaskCount
- + ", successfulTaskCount=" + vertex.succeededTaskCount
- + ", completedTaskCount=" + vertex.completedTaskCount
- + ", commitInProgress=" + vertex.commitFutures.size()
- + ", terminationCause=" + vertex.terminationCause);
-
+ LOG.info("Commits completion: "
+ + constructCheckTasksForCompletionLog(vertex));
// terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
if (vertex.terminationCause == null) {
Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
@@ -2184,20 +2177,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private void initializeCommitters() throws Exception {
if (!this.additionalOutputSpecs.isEmpty()) {
- LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+ LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" +
+ additionalOutputs.size());
for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
additionalOutputs.entrySet()) {
final String outputName = entry.getKey();
final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
if (od.getControllerDescriptor() == null
|| od.getControllerDescriptor().getClassName() == null) {
- LOG.info("Ignoring committer as none specified for output="
- + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring committer as none specified for output="
+ + outputName
+ + ", vertexId=" + logIdentifier);
+ }
continue;
}
LOG.info("Instantiating committer for output=" + outputName
- + ", vertexId=" + logIdentifier
+ + ", vertex=" + logIdentifier
+ ", committerClass=" + od.getControllerDescriptor().getClassName());
dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -2214,12 +2210,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
.createClazzInstance(od.getControllerDescriptor().getClassName(),
new Class[]{OutputCommitterContext.class},
new Object[]{outputCommitterContext});
- LOG.info("Invoking committer init for output=" + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoking committer init for output=" + outputName
+ + ", vertex=" + logIdentifier);
+ }
outputCommitter.initialize();
outputCommitters.put(outputName, outputCommitter);
- LOG.info("Invoking committer setup for output=" + outputName
- + ", vertexId=" + logIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoking committer setup for output=" + outputName
+ + ", vertex=" + logIdentifier);
+ }
outputCommitter.setupOutput();
return null;
}
@@ -3913,8 +3913,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
public VertexState transition(VertexImpl vertex, VertexEvent event) {
boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
- LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
- + vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
Task task = vertex.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -4210,10 +4208,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
int numEventsSent = events.size() - numPreRoutedEvents;
if (numEventsSent > 0) {
StringBuilder builder = new StringBuilder();
- builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
- .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
- .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
- .append(getLogIdentifier());
+ builder.append("Sending ").append(attemptID).append(" ")
+ .append(numEventsSent)
+ .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
+ .append(") total ").append(currEventCount).append(" ")
+ .append(getLogIdentifier());
LOG.info(builder.toString());
}
}
@@ -4478,9 +4477,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
for (String inputName : inputsWithInitializers) {
inputList.add(rootInputDescriptors.get(inputName));
}
- LOG.info("Vertex will initialize via inputInitializers "
- + logIdentifier + ". Starting root input initializers: "
- + inputsWithInitializers.size());
+ LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " +
+ logIdentifier);
initWaitsForRootInitializers = true;
rootInputInitializerManager.runInputInitializers(inputList);
// Send pending rootInputInitializerEvents
@@ -4564,6 +4562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+ LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
for (RootInputLeafOutputProto input : inputs) {
addIO(input.getName());
@@ -4608,7 +4607,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
- LOG.info("setting additional outputs for vertex " + this.vertexName);
+ LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier);
this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
for (RootInputLeafOutputProto output : outputs) {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 94889a1..da2200e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -147,7 +147,7 @@ public class ContainerLauncherImpl extends AbstractService implements
@SuppressWarnings("unchecked")
public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
- LOG.info("Launching Container with Id: " + event.getContainerId());
+ LOG.info("Launching " + event.getContainerId());
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
sendContainerLaunchFailedMsg(event.getContainerId(),
@@ -211,8 +211,7 @@ public class ContainerLauncherImpl extends AbstractService implements
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
- LOG.info("Sending a stop request to the NM for ContainerId: "
- + containerID);
+ LOG.info("Stopping " + containerID);
ContainerManagementProtocolProxyData proxy = null;
try {
@@ -378,7 +377,9 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override
public void run() {
- LOG.info("Processing the event " + event.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing event: " + event.toString());
+ }
// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 6d57737..785caf7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -144,7 +144,9 @@ public class TaskSchedulerEventHandler extends AbstractService
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
- LOG.info("Processing the event " + sEvent.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing the event " + sEvent.toString());
+ }
switch (sEvent.getType()) {
case S_TA_LAUNCH_REQUEST:
handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
@@ -160,7 +162,7 @@ public class TaskSchedulerEventHandler extends AbstractService
handleTASucceeded(event);
break;
default:
- throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+ throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
}
break;
case S_CONTAINER_DEALLOCATE:
@@ -302,8 +304,8 @@ public class TaskSchedulerEventHandler extends AbstractService
event);
return;
}
- LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
- + " but no locality information exists for it. Ignoring hint.");
+ LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+ + taskAttempt.getID() + " Ignoring.");
// fall through with null hosts/racks
} else {
hosts = (locationHint.getHosts() != null) ? locationHint
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 73fcb3d..f35a45f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -348,7 +348,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
"Heartbeats between preemptions should be >=1");
delayedContainerManager = new DelayedContainerManager();
- LOG.info("TaskScheduler initialized with configuration: " +
+ LOG.info("YarnTaskScheduler initialized with configuration: " +
"maxRMHeartbeatInterval: " + heartbeatIntervalMax +
", containerReuseEnabled: " + shouldReuseContainers +
", reuseRackLocal: " + reuseRackLocal +
@@ -428,8 +428,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
if (isStopStarted.get()) {
- for (ContainerStatus status : statuses) {
- LOG.info("Container " + status.getContainerId() + " is completed");
+ if (LOG.isDebugEnabled()) {
+ for (ContainerStatus status : statuses) {
+ LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
+ status);
+ }
}
return;
}
@@ -450,8 +453,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// being released
// completion of a container we had released earlier
// an allocated container completed. notify app
- LOG.info("Released container completed:" + completedId +
- " last allocated to task: " + task);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Released container completed:" + completedId +
+ " last allocated to task: " + task);
+ }
appContainerStatus.put(task, containerStatus);
continue;
}
@@ -467,9 +472,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
if(task != null) {
// completion of a container we have allocated currently
- // an allocated container completed. notify app
- LOG.info("Allocated container completed:" + completedId +
- " last allocated to task: " + task);
+ // an allocated container completed. notify app. This will cause attempt to get killed
+ LOG.info(
+ "Allocated container completed:" + completedId + " last allocated to task: " + task);
appContainerStatus.put(task, containerStatus);
continue;
}
@@ -488,9 +493,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public void onContainersAllocated(List<Container> containers) {
if (isStopStarted.get()) {
- for (Container container : containers) {
- LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
- releaseContainer(container.getId());
+ LOG.info("Ignoring container allocations because application is shutting down. Num " +
+ containers.size());
+ if (LOG.isDebugEnabled()) {
+ for (Container container : containers) {
+ LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
+ releaseContainer(container.getId());
+ }
}
return;
}
@@ -549,6 +558,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
// Release any unassigned containers given by the RM
+ if (containers.iterator().hasNext()) {
+ LOG.info("Releasing newly assigned containers which could not be allocated");
+ }
releaseUnassignedContainers(containers);
return assignedContainers;
@@ -602,15 +614,15 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
boolean isNew = heldContainer.isNew();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign a delayed container"
- + ", containerId=" + heldContainer.getContainer().getId()
- + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
- + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
- + ", AMState=" + state
- + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
- + ", taskRequestsCount=" + taskRequests.size()
- + ", heldContainers=" + heldContainers.size()
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", isNew=" + isNew);
+ + ", containerId=" + heldContainer.getContainer().getId()
+ + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+ + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+ + ", AMState=" + state
+ + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+ + ", taskRequestsCount=" + taskRequests.size()
+ + ", heldContainers=" + heldContainers.size()
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ + ", isNew=" + isNew);
}
if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
@@ -658,7 +670,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ ", isNew=" + isNew);
releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.getContainer()));
+ Collections.singletonList((heldContainer.getContainer())));
} else {
// no outstanding work and container idle timeout not expired
if (LOG.isDebugEnabled()) {
@@ -711,7 +723,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
assignReUsedContainerWithLocation(containerToAssign,
NODE_LOCAL_ASSIGNER, assignedContainers, true);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using node"
+ LOG.debug("Failed to assign tasks to delayed container using node"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -727,7 +739,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
assignReUsedContainerWithLocation(containerToAssign,
RACK_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using rack"
+ LOG.debug("Failed to assign tasks to delayed container using rack"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -743,7 +755,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
assignReUsedContainerWithLocation(containerToAssign,
NON_LOCAL_ASSIGNER, assignedContainers, false);
if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using non-local"
+ LOG.debug("Failed to assign tasks to delayed container using non-local"
+ ", containerId=" + heldContainer.getContainer().getId());
}
}
@@ -765,10 +777,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
&& idleContainerTimeoutMin != -1) {
LOG.info("Container's idle timeout expired. Releasing container"
- + ", containerId=" + heldContainer.container.getId()
- + ", containerExpiryTime="
- + heldContainer.getContainerExpiryTime()
- + ", idleTimeoutMin=" + idleContainerTimeoutMin);
+ + ", containerId=" + heldContainer.container.getId()
+ + ", containerExpiryTime="
+ + heldContainer.getContainerExpiryTime()
+ + ", idleTimeoutMin=" + idleContainerTimeoutMin);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
@@ -815,11 +827,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
if (safeToRelease &&
(!taskRequests.isEmpty() || !appContext.isSession())) {
LOG.info("Releasing held container as either there are pending but "
- + " unmatched requests or this is not a session"
- + ", containerId=" + heldContainer.container.getId()
- + ", pendingTasks=" + taskRequests.size()
- + ", isSession=" + appContext.isSession()
- + ". isNew=" + isNew);
+ + " unmatched requests or this is not a session"
+ + ", containerId=" + heldContainer.container.getId()
+ + ", pendingTasks=" + taskRequests.size()
+ + ", isSession=" + appContext.isSession()
+ + ". isNew=" + isNew);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
} else {
@@ -894,8 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// TODO this will not handle dynamic changes in resources
totalResources = Resources.clone(getAvailableResources());
LOG.info("App total resource memory: " + totalResources.getMemory() +
- " cpu: " + totalResources.getVirtualCores() +
- " taskAllocations: " + taskAllocations.size());
+ " cpu: " + totalResources.getVirtualCores() +
+ " taskAllocations: " + taskAllocations.size());
}
numHeartbeats++;
@@ -994,9 +1006,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// See if any of the delayedContainers can be used for this task.
delayedContainerManager.triggerScheduling(true);
LOG.info("Allocation request for task: " + task +
- " with request: " + request +
- " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
- " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+ " with request: " + request +
+ " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
+ " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
}
/**
@@ -1025,8 +1037,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
LOG.info("Ignoring removal of unknown task: " + task);
return false;
} else {
- LOG.info("Deallocated task: " + task + " from container: "
- + container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deallocated task: " + task + " from container: "
+ + container.getId());
+ }
if (!taskSucceeded || !shouldReuseContainers) {
if (LOG.isDebugEnabled()) {
@@ -1046,6 +1060,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
assignedContainers = assignDelayedContainer(heldContainer);
} else {
+ // this is a non standard situation
LOG.info("Skipping container after task deallocate as container is"
+ " no longer running, containerId=" + container.getId());
}
@@ -1064,8 +1079,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
public synchronized Object deallocateContainer(ContainerId containerId) {
Object task = unAssignContainer(containerId, true);
if(task != null) {
+ // non-standard case for the app layer to deallocate container
LOG.info("Deallocated container: " + containerId +
- " from task: " + task);
+ " from task: " + task);
return task;
}
@@ -1075,9 +1091,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@Override
public synchronized void initiateStop() {
- LOG.info("Initiate stop to YarnTaskScheduler");
+ LOG.info("Initiating stop of YarnTaskScheduler");
// release held containers
- LOG.info("Release held containers");
+ LOG.info("Releasing held containers");
isStopStarted.set(true);
// Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
// because method releaseContainer will change heldContainers.
@@ -1090,7 +1106,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
// remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
- LOG.info("Remove all the taskRequests");
+ LOG.info("Removing all pending taskRequests");
// Create a new list for tasks to avoid ConcurrentModificationException
List<Object> tasks = new ArrayList<Object>(taskRequests.size());
for (Object task : taskRequests.keySet()) {
@@ -1117,6 +1133,14 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
return (int) Math.ceil((original * percent)/100.f);
}
+ private String constructPreemptionPeriodicLog(Resource freeResource) {
+ return "Allocated: " + allocatedResources +
+ " Free: " + freeResource +
+ " pendingRequests: " + taskRequests.size() +
+ " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
+ " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
+ }
+
void preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
@@ -1127,10 +1151,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
synchronized (this) {
Resource freeResources = amRmClient.getAvailableResources();
if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
- " cpu:" + allocatedResources.getVirtualCores() +
- " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
- " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption);
+ LOG.debug(constructPreemptionPeriodicLog(freeResources));
+ } else {
+ if (numHeartbeats % 50 == 1) {
+ LOG.info(constructPreemptionPeriodicLog(freeResources));
+ }
}
assert freeResources.getMemory() >= 0;
@@ -1156,8 +1181,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Highest pri request: " + highestPriRequest + " fits in available resources "
- + freeResources);
+ LOG.debug(highestPriRequest + " fits in free resources");
+ } else {
+ if (numHeartbeats % 50 == 1) {
+ LOG.info(highestPriRequest + " fits in free resources");
+ }
}
return;
}
@@ -1651,8 +1679,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private void releaseUnassignedContainers(Iterable<Container> containers) {
for (Container container : containers) {
- LOG.info("Releasing unused container: "
- + container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing unused container: " + container.getId());
+ }
releaseContainer(container.getId());
}
}
@@ -1721,19 +1750,17 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
Object task = getTask(assigned);
assert task != null;
- LOG.info("Assigning container to task"
- + ", container=" + container
+ LOG.info("Assigning container to task: "
+ + "containerId=" + container.getId()
+ ", task=" + task
- + ", containerHost=" + container.getNodeId().getHost()
+ + ", containerHost=" + container.getNodeId()
+ + ", containerPriority= " + container.getPriority()
+ + ", containerResources=" + container.getResource()
+ ", localityMatchType=" + locality
+ ", matchedLocation=" + matchedLocation
+ ", honorLocalityFlags=" + honorLocalityFlags
- + ", reusedContainer="
- + containerAssignments.containsKey(container.getId())
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", containerResourceMemory=" + container.getResource().getMemory()
- + ", containerResourceVCores="
- + container.getResource().getVirtualCores());
+ + ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
assignContainer(task, container, assigned);
}
@@ -1921,6 +1948,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
heldContainers.get(delayedContainer.getContainer().getId())) {
assignedContainers = assignDelayedContainer(delayedContainer);
} else {
+ // non standard scenario
LOG.info("Skipping delayed container as container is no longer"
+ " running, containerId="
+ delayedContainer.getContainer().getId());
@@ -1975,9 +2003,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
HeldContainer delayedContainer = iter.next();
if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
// this container is no longer held by us
+ // non standard scenario
LOG.info("AssignAll - Skipping delayed container as container is no longer"
- + " running, containerId="
- + delayedContainer.getContainer().getId());
+ + " running, containerId="
+ + delayedContainer.getContainer().getId());
iter.remove();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fa56..11b5006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -113,8 +113,10 @@ public class AMContainerHelpers {
// correctly, even though they may not be used by all tasks which will run
// on this container.
- LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
- + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC");
+ }
containerCredentials.addAll(credentials);
DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
@@ -123,7 +125,9 @@ public class AMContainerHelpers {
containerTokens_dob.getLength());
// Add shuffle token
- LOG.info("Putting shuffle token in serviceData");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Putting shuffle token in serviceData in common CLC");
+ }
serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9b90752..8b6e861 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -397,9 +397,11 @@ public class AMContainerImpl implements AMContainer {
// TODO Can't set state to COMPLETED. Add a default error state.
}
if (oldState != getState()) {
- LOG.info("AMContainer " + this.containerId + " transitioned from "
- + oldState + " to " + getState()
- + " via event " + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AMContainer " + this.containerId + " transitioned from "
+ + oldState + " to " + getState()
+ + " via event " + event.getType());
+ }
}
} finally {
writeLock.unlock();
@@ -450,8 +452,10 @@ public class AMContainerImpl implements AMContainer {
// task is not told to die since the TAL does not know about the container.
container.registerWithTAListener();
container.sendStartRequestToNM(clc);
- LOG.info("Sending Launch Request for Container with id: " +
- container.container.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Launch Request for Container with id: " +
+ container.container.getId());
+ }
}
}
@@ -509,7 +513,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.deAllocate();
- LOG.info(
+ LOG.warn(
"Unexpected event type: " + cEvent.getType() + " while in state: " +
container.getState() + ". Event: " + cEvent);
@@ -573,8 +577,6 @@ public class AMContainerImpl implements AMContainer {
}
}
- LOG.info("Assigned taskAttempt + [" + container.currentAttempt +
- "] to container: [" + container.getContainerId() + "]");
AMContainerTask amContainerTask = new AMContainerTask(
event.getRemoteTaskSpec(), container.additionalLocalResources,
container.credentialsChanged ? container.credentials : null, container.credentialsChanged,
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 102cbe9..a067cee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -132,6 +132,8 @@ public class AMNodeTracker extends AbstractService implements
AMNode amNode = nodeMap.get(nodeId);
if (amNode == null) {
LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+ // This implies the node exists on the cluster, but is not running a container for
+ // this application
} else {
amNode.handle(rEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 9e275a2..e17a4d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing HistoryEventHandler");
-
this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
@@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService {
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+ LOG.info("Initializing HistoryEventHandler with"
+ + "recoveryEnabled=" + recoveryEnabled
+ + ", historyServiceClassName=" + historyServiceClassName);
+
historyLoggingService =
ReflectionUtils.createClazzInstance(historyServiceClassName);
historyLoggingService.setAppContext(context);
@@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService {
addService(recoveryService);
}
super.serviceInit(conf);
+
}
@Override
public void serviceStart() throws Exception {
- LOG.info("Starting HistoryEventHandler");
super.serviceStart();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 9f24151..7d83db2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
@Override
public String toString() {
+ String counterStr = "";
+ if (state != TaskAttemptState.SUCCEEDED) {
+ counterStr = ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ }
return "vertexName=" + vertexName
+ ", taskAttemptId=" + taskAttemptId
+ ", creationTime=" + creationTime
@@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
- + ", lastDataEventSourceTA=" +
- ((dataEvents==null) ? 0:dataEvents.size())
- + ", counters=" + (tezCounters == null ? "null" :
- tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ + counterStr;
}
public TezTaskAttemptID getTaskAttemptID() {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a58b49e..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
+ ", taskAttemptId=" + taskAttemptId
+ ", startTime=" + launchTime
+ ", containerId=" + containerId
- + ", nodeId=" + nodeId
- + ", inProgressLogs=" + inProgressLogsUrl
- + ", completedLogs=" + completedLogsUrl;
+ + ", nodeId=" + nodeId;
}
public TezTaskAttemptID getTaskAttemptID() {
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8852e02..4372d8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
if (loggingDisabled) {
return;
}
- LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ }
try {
try {
JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d870645..2fe0e6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService {
@Override
public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing RecoveryService");
recoveryPath = appContext.getCurrentRecoveryDir();
recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
@@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService {
drainEventsFlag = conf.getBoolean(
TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
+
+ LOG.info("RecoveryService initialized with "
+ + "recoveryPath=" + recoveryPath
+ + ", bufferSize(bytes)=" + bufferSize
+ + ", flushInterval(s)=" + flushInterval
+ + ", maxUnflushedEvents=" + maxUnflushedEvents);
}
@Override
public void serviceStart() {
- LOG.info("Starting RecoveryService");
lastFlushTime = appContext.getClock().getTime();
eventHandlingThread = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 7a2aeab..c53994e 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
#
# Event Counter Appender
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 69237d4..1b66c8e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter {
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
- LOG.info("Using mapred newApiCommitter.");
}
+ LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
+ " using " + (newApiCommitter ? "new" : "old") + "mapred API");
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbc7748..b93e4ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer {
@Override
public List<Event> initialize() throws Exception {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
MRInputUserPayloadProto userPayloadProto = MRInputHelpers
.parseMRInputPayload(getContext().getInputUserPayload());
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
- if (LOG.isDebugEnabled()) {
- sw.reset().start();
- }
+ sw.reset().start();
Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
sendSerializedEvents = conf.getBoolean(
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
- LOG.info("Emitting serialized splits: " + sendSerializedEvents);
+
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
+ LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " +
+ getContext().getInputName());
LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
}
- if (LOG.isDebugEnabled()) {
- sw.reset().start();
- }
+ sw.reset().start();
int totalResource = getContext().getTotalAvailableResource().getMemory();
int taskResource = getContext().getVertexTaskResource().getMemory();
@@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer {
int numTasks = (int)((totalResource*waves)/taskResource);
+
+
+ boolean groupSplits = userPayloadProto.getGroupingEnabled();
LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
+ " tasks. Headroom: " + totalResource + " Task Resource: "
- + taskResource + " waves: " + waves);
+ + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits);
// Read all credentials into the credentials instance stored in JobConf.
JobConf jobConf = new JobConf(conf);
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
InputSplitInfoMem inputSplitInfo = null;
- boolean groupSplits = userPayloadProto.getGroupingEnabled();
+
if (groupSplits) {
- LOG.info("Grouping input splits");
inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
} else {
inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index e6b70d2..28d108e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer {
@Override
public List<Event> initialize() throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
+ Stopwatch sw = new Stopwatch().start();
MRInputUserPayloadProto userPayloadProto = MRInputHelpers
.parseMRInputPayload(getContext().getInputUserPayload());
+ sw.stop();
if (LOG.isDebugEnabled()) {
- sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 7f5e0e3..30e4a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -284,21 +284,27 @@ public class MRInputHelpers {
InputSplitInfoMem splitInfoMem = null;
JobConf jobConf = new JobConf(conf);
if (jobConf.getUseNewMapper()) {
- LOG.info("Generating mapreduce api input splits");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating mapreduce api input splits");
+ }
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(job, groupSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, job.getCredentials(), job.getConfiguration());
} else {
- LOG.info("Generating mapred api input splits");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating mapred api input splits");
+ }
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, groupSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, jobConf.getCredentials(), jobConf);
}
- LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
- + splitInfoMem.getSplitsProto().getSerializedSize());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ + splitInfoMem.getSplitsProto().getSerializedSize());
+ }
return splitInfoMem;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 720af50..80828d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
if (useNewApi) {
oldPartitioner = null;
if (partitions > 1) {
+ Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz =
+ (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+ .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
+ LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName());
newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
- .newInstance(
- (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
- .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
- org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
+ .newInstance(clazz, conf);
} else {
newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
@Override
@@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
} else {
newPartitioner = null;
if (partitions > 1) {
- oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+ Class<? extends org.apache.hadoop.mapred.Partitioner> clazz =
(Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
- "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class),
- new JobConf(conf));
+ "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class);
+ LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName());
+ oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+ clazz, new JobConf(conf));
} else {
oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index d0e935f..6ea21e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
+ TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
return;
}
- LOG.info("Initializing ATSService");
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
@@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
}
sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
- LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+ LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+ + "maxEventsPerBatch=" + maxEventsPerBatch
+ + ", maxPollingTime(ms)=" + maxPollingTimeMillis
+ + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown
+ + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
+
try {
historyACLPolicyManager = ReflectionUtils.createClazzInstance(
atsHistoryACLManagerClassName);
@@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (!historyLoggingEnabled || timelineClient == null) {
return;
}
- LOG.info("Starting ATSService");
timelineClient.start();
eventHandlingThread = new Thread(new Runnable() {