You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/01/05 20:32:11 UTC
tez git commit: TEZ-2972. Avoid task rescheduling when a node turns
unhealthy (jlowe)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 9880c414c -> d80e30d3a
TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d80e30d3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d80e30d3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d80e30d3
Branch: refs/heads/branch-0.7
Commit: d80e30d3af44471c5da98e5e9a1dd08929dc3931
Parents: 9880c41
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jan 5 19:31:35 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jan 5 19:31:35 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +++++
tez-dag/findbugs-exclude.xml | 1 +
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 16 ++++---
.../tez/dag/app/rm/node/AMNodeTracker.java | 14 ++++--
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 47 ++++++++++++++++++++
6 files changed, 82 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f8897e..c2364a7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,8 +4,10 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
TEZ-2949. Allow duplicate dag names within session for Tez.
+ TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES
+ TEZ-2972. Avoid task rescheduling when a node turns unhealthy
TEZ-3017. HistoryACLManager does not have a close method for cleanup
TEZ-2914. Ability to limit vertex concurrency
TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8997327..d4d5759 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -541,6 +541,18 @@ public class TezConfiguration extends Configuration {
+ "node-blacklisting.ignore-threshold-node-percent";
public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+ /**
+ * Boolean value. Enable task rescheduling for node updates.
+ * When enabled the task scheduler will reschedule task attempts that
+ * are associated with an unhealthy node to avoid potential data transfer
+ * errors from downstream tasks.
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS =
+ TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks";
+ public static final boolean
+ TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false;
+
/** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CLIENT_THREAD_COUNT =
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 4c01edc..2842a50 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -18,6 +18,7 @@
<Or>
<Field name="blacklistDisablePercent" />
<Field name="maxTaskFailuresPerNode" />
+ <Field name="nodeUpdatesRescheduleEnabled" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index b93cab3..f4d89e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -58,6 +58,7 @@ public class AMNodeImpl implements AMNode {
private final int maxTaskFailuresPerNode;
private boolean blacklistingEnabled;
private boolean ignoreBlacklisting = false;
+ private boolean nodeUpdatesRescheduleEnabled;
private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
@SuppressWarnings("rawtypes")
@@ -174,7 +175,7 @@ public class AMNodeImpl implements AMNode {
@SuppressWarnings("rawtypes")
public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
EventHandler eventHandler, boolean blacklistingEnabled,
- AppContext appContext) {
+ boolean rescheduleOnUnhealthyNode, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -182,6 +183,7 @@ public class AMNodeImpl implements AMNode {
this.appContext = appContext;
this.eventHandler = eventHandler;
this.blacklistingEnabled = blacklistingEnabled;
+ this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode;
this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
this.stateMachine = stateMachineFactory.make(this);
// TODO Handle the case where a node is created due to the RM reporting it's
@@ -321,12 +323,14 @@ public class AMNodeImpl implements AMNode {
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
- for (ContainerId c : node.containers) {
- node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ if (node.nodeUpdatesRescheduleEnabled) {
+ for (ContainerId c : node.containers) {
+ node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ }
+ // Resetting counters.
+ node.numFailedTAs = 0;
+ node.numSuccessfulTAs = 0;
}
- // Resetting counters.
- node.numFailedTAs = 0;
- node.numSuccessfulTAs = 0;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index a067cee..38c154b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -53,7 +53,8 @@ public class AMNodeTracker extends AbstractService implements
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
float currentIgnoreBlacklistingCountThreshold = 0;
-
+ private boolean nodeUpdatesRescheduleEnabled;
+
@SuppressWarnings("rawtypes")
public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
super("AMNodeMap");
@@ -74,10 +75,14 @@ public class AMNodeTracker extends AbstractService implements
this.blacklistDisablePercent = conf.getInt(
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+ this.nodeUpdatesRescheduleEnabled = conf.getBoolean(
+ TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+ TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT);
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
- ", blacklistingEnabled: " + nodeBlacklistingEnabled +
- ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+ ", blacklistingEnabled: " + nodeBlacklistingEnabled +
+ ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode +
+ ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new TezUncheckedException("Invalid blacklistDisablePercent: "
@@ -88,7 +93,8 @@ public class AMNodeTracker extends AbstractService implements
public void nodeSeen(NodeId nodeId) {
if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
- eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+ eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled,
+ appContext)) == null) {
LOG.info("Adding new node: " + nodeId);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index d907ea0..0072f6a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -321,6 +321,53 @@ public class TestAMNodeTracker {
amNodeTracker.stop();
}
+ @Test(timeout=10000)
+ public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
+ _testNodeUnhealthyRescheduleTasks(true);
+ }
+
+ @Test(timeout=10000)
+ public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception {
+ _testNodeUnhealthyRescheduleTasks(false);
+ }
+
+ private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+ rescheduleTasks);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
+
+ // add a node
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1));
+ NodeId nodeId = NodeId.newInstance("host1", 1234);
+ amNodeTracker.nodeSeen(nodeId);
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+
+ // simulate task starting on node
+ ContainerId cid = mock(ContainerId.class);
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cid));
+
+ // mark node unhealthy
+ NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+ amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+ assertEquals(AMNodeState.UNHEALTHY, node.getState());
+
+ // check for task rescheduling events
+ if (rescheduleTasks) {
+ assertEquals(1, handler.events.size());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+ } else {
+ assertEquals(0, handler.events.size());
+ }
+
+ amNodeTracker.stop();
+ }
+
private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
NodeReport nodeReport = mock(NodeReport.class);
doReturn(nodeId).when(nodeReport).getNodeId();