You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2017/02/06 02:25:27 UTC
tez git commit: TEZ-3601. Add another HistoryLogLevel to suppress
TaskAttempts at specific levels. Contributed by Harish Jaiprakash.
Repository: tez
Updated Branches:
refs/heads/branch-0.8 c797d6e3f -> 26d179f8c
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at
specific levels. Contributed by Harish Jaiprakash.
(cherry picked from commit c0270cb30a582ab2b5cbc8442054ce0c2a766c15)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/26d179f8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/26d179f8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/26d179f8
Branch: refs/heads/branch-0.8
Commit: 26d179f8c74e6ddfdfd3f4aaaa357c6e9ed2eb81
Parents: c797d6e
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Feb 5 18:24:15 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Feb 5 18:25:23 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/dag/api/HistoryLogLevel.java | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +-
.../org/apache/tez/common/TezUtilsInternal.java | 19 ++++
.../tez/dag/history/HistoryEventHandler.java | 114 ++++++++++++++++---
.../tez/dag/history/HistoryEventType.java | 4 +-
.../dag/history/TestHistoryEventHandler.java | 79 ++++++++++---
7 files changed, 198 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c1e7c1..55fdfc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results.
TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
TEZ-3574. Container reuse won't pickup extra dag level local resource.
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
index 5eb4785..96d74f9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
@@ -34,6 +34,7 @@ public enum HistoryLogLevel {
DAG,
VERTEX,
TASK,
+ TASK_ATTEMPT,
ALL;
public static final HistoryLogLevel DEFAULT = ALL;
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 62d9c9a..6144399 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.api;
import java.lang.reflect.Field;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1244,6 +1243,17 @@ public class TezConfiguration extends Configuration {
TEZ_PREFIX + "history.logging.log.level";
/**
+ * List of comma separated enum values. Specifies the list of task attempt termination causes,
+ * which have to be suppressed from being logged to ATS. The valid filters are defined in the
+ * enum TaskAttemptTerminationCause. The filters are applied only if tez.history.logging.log.level
+ * is set to TASK_ATTEMPT.
+ */
+ @ConfigurationScope(Scope.DAG)
+ @ConfigurationProperty
+ public static final String TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS =
+ TEZ_PREFIX + "history.logging.taskattempt-filters";
+
+ /**
* Comma separated list of Integers. These are the values that were set for the config value
* for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so
* that the groupIds generated previously will continue to be generated by the plugin. If an older
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index b8c05e7..5f90875 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -26,8 +26,10 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.BitSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -42,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.log4j.Appender;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -338,6 +341,22 @@ public class TezUtilsInternal {
}
}
+ public static <T extends Enum<T>> Set<T> getEnums(Configuration conf, String confName,
+ Class<T> enumType, String defaultValues) {
+ String[] names = conf.getStrings(confName);
+ if (names == null) {
+ names = StringUtils.getStrings(defaultValues);
+ }
+ if (names == null) {
+ return null;
+ }
+ Set<T> enums = new HashSet<>();
+ for (String name : names) {
+ enums.add(Enum.valueOf(enumType, name));
+ }
+ return enums;
+ }
+
@Private
public static void setHadoopCallerContext(HadoopShim hadoopShim, TezTaskAttemptID attemptID) {
hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString());
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 042d022..79d1fc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.history;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -27,13 +28,18 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
public class HistoryEventHandler extends CompositeService {
@@ -45,8 +51,13 @@ public class HistoryEventHandler extends CompositeService {
private HistoryLoggingService historyLoggingService;
private HistoryLogLevel amHistoryLogLevel;
- private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel =
- new ConcurrentHashMap<TezDAGID, HistoryLogLevel>();
+ private final Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel = new ConcurrentHashMap<>();
+ private Set<TaskAttemptTerminationCause> amTaskAttemptFilters;
+ private final Map<TezDAGID, Set<TaskAttemptTerminationCause>> dagIdToTaskAttemptFilters =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents =
+ new ConcurrentHashMap<>();
public HistoryEventHandler(AppContext context) {
super(HistoryEventHandler.class.getName());
@@ -80,6 +91,11 @@ public class HistoryEventHandler extends CompositeService {
}
amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(), HistoryLogLevel.DEFAULT);
+ amTaskAttemptFilters = TezUtilsInternal.getEnums(
+ context.getAMConf(),
+ TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+ TaskAttemptTerminationCause.class,
+ null);
super.serviceInit(conf);
}
@@ -108,15 +124,20 @@ public class HistoryEventHandler extends CompositeService {
if(dagId != null) {
dagIdStr = dagId.toString();
}
+ HistoryEvent historyEvent = event.getHistoryEvent();
if (LOG.isDebugEnabled()) {
LOG.debug("Handling history event"
- + ", eventType=" + event.getHistoryEvent().getEventType());
+ + ", eventType=" + historyEvent.getEventType());
}
- if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
+ if (recoveryEnabled && historyEvent.isRecoveryEvent()) {
recoveryService.handle(event);
}
- if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) {
+ if (historyEvent.isHistoryEvent() && shouldLogEvent(event)) {
+ DAGHistoryEvent suppressedEvent = getSupressedEvent(historyEvent);
+ if (suppressedEvent != null) {
+ historyLoggingService.handle(suppressedEvent);
+ }
historyLoggingService.handle(event);
}
@@ -140,23 +161,86 @@ public class HistoryEventHandler extends CompositeService {
}
HistoryEvent historyEvent = event.getHistoryEvent();
- if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) {
- dagLogLevel = HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(),
- amHistoryLogLevel);
+ HistoryEventType eventType = historyEvent.getEventType();
+ if (eventType == HistoryEventType.DAG_SUBMITTED) {
+ Configuration dagConf = ((DAGSubmittedEvent)historyEvent).getConf();
+ dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel);
dagIdToLogLevel.put(dagId, dagLogLevel);
- } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) {
+ maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf);
+ } else if (eventType == HistoryEventType.DAG_RECOVERED) {
if (context.getCurrentDAG() != null) {
- dagLogLevel = HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(),
- amHistoryLogLevel);
+ Configuration dagConf = context.getCurrentDAG().getConf();
+ dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel);
dagIdToLogLevel.put(dagId, dagLogLevel);
+ maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf);
+ }
+ } else if (eventType == HistoryEventType.DAG_FINISHED) {
+ dagIdToLogLevel.remove(dagId);
+ dagIdToTaskAttemptFilters.remove(dagId);
+ suppressedEvents.clear();
+ }
+
+ if (dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel())) {
+ return shouldLogTaskAttemptEvents(event, dagLogLevel);
+ }
+ return false;
+ }
+
+ // If the log level is set to TASK_ATTEMPT and filters are configured, then we should suppress
+ // the start event and publish it only when TaskAttemptFinishedEvent is received after
+ // matching against the filter.
+ // Note: if the AM is killed before we get the TaskAttemptFinishedEvent, we'll lose this event.
+ private boolean shouldLogTaskAttemptEvents(DAGHistoryEvent event, HistoryLogLevel dagLogLevel) {
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ HistoryEventType eventType = historyEvent.getEventType();
+ if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT &&
+ (eventType == HistoryEventType.TASK_ATTEMPT_STARTED ||
+ eventType == HistoryEventType.TASK_ATTEMPT_FINISHED)) {
+ TezDAGID dagId = event.getDagID();
+ Set<TaskAttemptTerminationCause> filters = null;
+ if (dagId != null) {
+ filters = dagIdToTaskAttemptFilters.get(dagId);
+ }
+ if (filters == null) {
+ filters = amTaskAttemptFilters;
+ }
+ if (filters == null) {
+ return true;
}
- } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
- if (dagIdToLogLevel.containsKey(dagId)) {
- dagIdToLogLevel.remove(dagId);
+ if (eventType == HistoryEventType.TASK_ATTEMPT_STARTED) {
+ suppressedEvents.put(((TaskAttemptStartedEvent)historyEvent).getTaskAttemptID(), event);
+ return false;
+ } else { // TaskAttemptFinishedEvent
+ TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent;
+ if (filters.contains(finishedEvent.getTaskAttemptError())) {
+ suppressedEvents.remove(finishedEvent.getTaskAttemptID());
+ return false;
+ }
}
}
+ return true;
+ }
- return dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel());
+ private void maybeUpdateDagTaskAttemptFilters(TezDAGID dagId, HistoryLogLevel dagLogLevel,
+ Configuration dagConf) {
+ if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT) {
+ Set<TaskAttemptTerminationCause> filters = TezUtilsInternal.getEnums(
+ dagConf,
+ TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+ TaskAttemptTerminationCause.class,
+ null);
+ if (filters != null) {
+ dagIdToTaskAttemptFilters.put(dagId, filters);
+ }
+ }
+ }
+
+ private DAGHistoryEvent getSupressedEvent(HistoryEvent historyEvent) {
+ if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+ TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent;
+ return suppressedEvents.remove(finishedEvent.getTaskAttemptID());
+ }
+ return null;
}
public void handle(DAGHistoryEvent event) {
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a41d0e6..a536fdf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,8 +36,8 @@ public enum HistoryEventType {
VERTEX_FINISHED(HistoryLogLevel.VERTEX),
TASK_STARTED(HistoryLogLevel.TASK),
TASK_FINISHED(HistoryLogLevel.TASK),
- TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL),
- TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL),
+ TASK_ATTEMPT_STARTED(HistoryLogLevel.TASK_ATTEMPT),
+ TASK_ATTEMPT_FINISHED(HistoryLogLevel.TASK_ATTEMPT),
CONTAINER_LAUNCHED(HistoryLogLevel.ALL),
CONTAINER_STOPPED(HistoryLogLevel.ALL),
DAG_COMMIT_STARTED(HistoryLogLevel.DAG),
http://git-wip-us.apache.org/repos/asf/tez/blob/26d179f8/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
index c8a076d..4c0fe3f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -32,23 +32,29 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Before;
import org.junit.Test;
public class TestHistoryEventHandler {
@@ -56,42 +62,69 @@ public class TestHistoryEventHandler {
private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
private static String user = "TEST_USER";
+ private Configuration baseConfig;
+
+ @Before
+ public void setupConfig() {
+ baseConfig = new Configuration(false);
+ }
@Test
public void testAll() {
- testLogLevel(null, 6);
+ testLogLevel(null, 11);
testLogLevel(HistoryLogLevel.NONE, 0);
testLogLevel(HistoryLogLevel.AM, 1);
testLogLevel(HistoryLogLevel.DAG, 3);
testLogLevel(HistoryLogLevel.VERTEX, 4);
testLogLevel(HistoryLogLevel.TASK, 5);
- testLogLevel(HistoryLogLevel.ALL, 6);
+ testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9);
+ testLogLevel(HistoryLogLevel.ALL, 11);
+ }
+
+ @Test
+ public void testTaskAttemptFilters() {
+ baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+ "EXTERNAL_PREEMPTION,INTERRUPTED_BY_USER");
+ testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 5);
+ testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 5);
+
+ baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+ "EXTERNAL_PREEMPTION");
+ testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 7);
+ testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 7);
+
+ baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, "INTERNAL_PREEMPTION");
+ testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9);
+ testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9);
}
@Test
public void testWithDAGRecovery() {
- testLogLevelWithRecovery(null, 6);
+ testLogLevelWithRecovery(null, 11);
testLogLevelWithRecovery(HistoryLogLevel.AM, 1);
testLogLevelWithRecovery(HistoryLogLevel.DAG, 3);
testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4);
testLogLevelWithRecovery(HistoryLogLevel.TASK, 5);
- testLogLevelWithRecovery(HistoryLogLevel.ALL, 6);
+ testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9);
+ testLogLevelWithRecovery(HistoryLogLevel.ALL, 11);
}
@Test
public void testMultipleDag() {
- testLogLevel(null, HistoryLogLevel.NONE, 7);
- testLogLevel(null, HistoryLogLevel.AM, 7);
- testLogLevel(null, HistoryLogLevel.DAG, 9);
- testLogLevel(null, HistoryLogLevel.VERTEX, 10);
- testLogLevel(null, HistoryLogLevel.TASK, 11);
- testLogLevel(null, HistoryLogLevel.ALL, 12);
+ testLogLevel(null, HistoryLogLevel.NONE, 14);
+ testLogLevel(null, HistoryLogLevel.AM, 14);
+ testLogLevel(null, HistoryLogLevel.DAG, 16);
+ testLogLevel(null, HistoryLogLevel.VERTEX, 17);
+ testLogLevel(null, HistoryLogLevel.TASK, 18);
+ testLogLevel(null, HistoryLogLevel.TASK_ATTEMPT, 22);
+ testLogLevel(null, HistoryLogLevel.ALL, 22);
testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5);
testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5);
testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7);
testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8);
testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9);
- testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK_ATTEMPT, 13);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 13);
testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0);
}
@@ -153,7 +186,7 @@ public class TestHistoryEventHandler {
}
private HistoryEventHandler createHandler(HistoryLogLevel logLevel) {
- Configuration conf = new Configuration(false);
+ Configuration conf = new Configuration(baseConfig);
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
InMemoryHistoryLoggingService.class.getName());
@@ -181,6 +214,7 @@ public class TestHistoryEventHandler {
long time = System.currentTimeMillis();
Configuration conf = new Configuration(inConf);
+
historyEvents.add(new DAGHistoryEvent(null,
new AMStartedEvent(attemptId, time, user)));
historyEvents.add(new DAGHistoryEvent(dagId,
@@ -189,16 +223,33 @@ public class TestHistoryEventHandler {
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new VertexStartedEvent(vertexID, time, time)));
+ ContainerId containerId = ContainerId.newContainerId(attemptId, dagId.getId());
TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new TaskStartedEvent(tezTaskID, "test", time, time)));
+ historyEvents.add(
+ new DAGHistoryEvent(new ContainerLaunchedEvent(containerId, time, attemptId)));
historyEvents.add(new DAGHistoryEvent(dagId,
new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
- ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null,
- null, null)));
+ containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+ time + 1, TaskAttemptState.KILLED, null,
+ TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "", null, null, null, time, null, time,
+ containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test", time,
+ containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test", time + 2,
+ time + 3, TaskAttemptState.KILLED, null,
+ TaskAttemptTerminationCause.INTERRUPTED_BY_USER, "", null, null, null, time, null,
+ time + 2, containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
historyEvents.add(new DAGHistoryEvent(dagId,
new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null, null, user, "test", null,
attemptId, DAGPlan.getDefaultInstance())));
+ historyEvents.add(
+ new DAGHistoryEvent(new ContainerStoppedEvent(containerId, time + 4, 0, attemptId)));
return historyEvents;
}
}