You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2015/12/18 22:08:01 UTC
tez git commit: TEZ-2972. Avoid task rescheduling when a node turns
unhealthy (jlowe)
Repository: tez
Updated Branches:
refs/heads/master c34c54f70 -> 4ed7d1aba
TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ed7d1ab
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ed7d1ab
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ed7d1ab
Branch: refs/heads/master
Commit: 4ed7d1abafa9aab7636c1febcb1a63ea63fde9c8
Parents: c34c54f
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 18 21:01:36 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 18 21:01:36 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 13 ++++++
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 16 ++++---
.../tez/dag/app/rm/node/AMNodeTracker.java | 10 ++++-
.../dag/app/rm/node/PerSourceNodeTracker.java | 8 +++-
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 47 ++++++++++++++++++++
6 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ad2203..a3b0fa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators.
TEZ-2949. Allow duplicate dag names within session for Tez.
TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
+ TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
@@ -90,6 +91,7 @@ ALL CHANGES:
TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
TEZ-2887. Tez build failure due to missing dependency in pom files.
TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper.
+ TEZ-2972. Avoid task rescheduling when a node turns unhealthy
Release 0.8.1-alpha: 2015-10-12
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index fabc256..b707857 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -569,6 +569,19 @@ public class TezConfiguration extends Configuration {
+ "node-blacklisting.ignore-threshold-node-percent";
public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+ /**
+ * Boolean value. Enable task rescheduling for node updates.
+ * When enabled the task scheduler will reschedule task attempts that
+ * are associated with an unhealthy node to avoid potential data transfer
+ * errors from downstream tasks.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="boolean")
+ public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS =
+ TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks";
+ public static final boolean
+ TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false;
+
/** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 18d5978..bcc38c6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -59,6 +59,7 @@ public class AMNodeImpl implements AMNode {
private final int maxTaskFailuresPerNode;
private boolean blacklistingEnabled;
private boolean ignoreBlacklisting = false;
+ private boolean nodeUpdatesRescheduleEnabled;
private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
@SuppressWarnings("rawtypes")
@@ -175,7 +176,7 @@ public class AMNodeImpl implements AMNode {
@SuppressWarnings("rawtypes")
public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode,
EventHandler eventHandler, boolean blacklistingEnabled,
- AppContext appContext) {
+ boolean rescheduleOnUnhealthyNode, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -184,6 +185,7 @@ public class AMNodeImpl implements AMNode {
this.appContext = appContext;
this.eventHandler = eventHandler;
this.blacklistingEnabled = blacklistingEnabled;
+ this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode;
this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
this.stateMachine = stateMachineFactory.make(this);
// TODO Handle the case where a node is created due to the RM reporting it's
@@ -323,12 +325,14 @@ public class AMNodeImpl implements AMNode {
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
- for (ContainerId c : node.containers) {
- node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ if (node.nodeUpdatesRescheduleEnabled) {
+ for (ContainerId c : node.containers) {
+ node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ }
+ // Resetting counters.
+ node.numFailedTAs = 0;
+ node.numSuccessfulTAs = 0;
}
- // Resetting counters.
- node.numFailedTAs = 0;
- node.numSuccessfulTAs = 0;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 1aa8472..fdc8a4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -50,6 +50,7 @@ public class AMNodeTracker extends AbstractService implements
private int maxTaskFailuresPerNode;
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
+ private boolean nodeUpdatesRescheduleEnabled;
@SuppressWarnings("rawtypes")
public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
@@ -70,10 +71,14 @@ public class AMNodeTracker extends AbstractService implements
this.blacklistDisablePercent = conf.getInt(
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+ this.nodeUpdatesRescheduleEnabled = conf.getBoolean(
+ TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+ TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT);
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
", blacklistingEnabled: " + nodeBlacklistingEnabled +
- ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+ ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode +
+ ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new TezUncheckedException("Invalid blacklistDisablePercent: "
@@ -143,7 +148,8 @@ public class AMNodeTracker extends AbstractService implements
if (nodeTracker == null) {
nodeTracker =
new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode,
- nodeBlacklistingEnabled, blacklistDisablePercent);
+ nodeBlacklistingEnabled, blacklistDisablePercent,
+ nodeUpdatesRescheduleEnabled);
PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker);
nodeTracker = old != null ? old : nodeTracker;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index b1c81af..72c3230 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -42,6 +42,7 @@ public class PerSourceNodeTracker {
private final int maxTaskFailuresPerNode;
private final boolean nodeBlacklistingEnabled;
private final int blacklistDisablePercent;
+ private final boolean nodeUpdatesRescheduleEnabled;
private int numClusterNodes;
float currentIgnoreBlacklistingCountThreshold = 0;
@@ -50,7 +51,8 @@ public class PerSourceNodeTracker {
@SuppressWarnings("rawtypes")
public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext,
int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled,
- int blacklistDisablePercent) {
+ int blacklistDisablePercent,
+ boolean nodeUpdatesRescheduleEnabled) {
this.sourceId = sourceId;
this.nodeMap = new ConcurrentHashMap<>();
this.blacklistMap = new ConcurrentHashMap<>();
@@ -60,13 +62,15 @@ public class PerSourceNodeTracker {
this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
this.nodeBlacklistingEnabled = nodeBlacklistingEnabled;
this.blacklistDisablePercent = blacklistDisablePercent;
+ this.nodeUpdatesRescheduleEnabled = nodeUpdatesRescheduleEnabled;
}
public void nodeSeen(NodeId nodeId) {
if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode,
- eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+ eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled,
+ appContext)) == null) {
LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 143fcbf..25d1784 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -326,6 +326,53 @@ public class TestAMNodeTracker {
}
}
+ @Test(timeout=10000)
+ public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
+ _testNodeUnhealthyRescheduleTasks(true);
+ }
+
+ @Test(timeout=10000)
+ public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception {
+ _testNodeUnhealthyRescheduleTasks(false);
+ }
+
+ private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+ rescheduleTasks);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
+
+ // add a node
+ amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
+ NodeId nodeId = NodeId.newInstance("host1", 1234);
+ amNodeTracker.nodeSeen(nodeId, 0);
+ AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
+
+ // simulate task starting on node
+ ContainerId cid = mock(ContainerId.class);
+ amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cid));
+
+ // mark node unhealthy
+ NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+ amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
+ assertEquals(AMNodeState.UNHEALTHY, node.getState());
+
+ // check for task rescheduling events
+ if (rescheduleTasks) {
+ assertEquals(1, handler.events.size());
+ assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+ } else {
+ assertEquals(0, handler.events.size());
+ }
+
+ amNodeTracker.stop();
+ }
+
private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker,
TestEventHandler handler, int schedulerId) {
amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId));