You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2015/12/18 22:08:01 UTC

tez git commit: TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master c34c54f70 -> 4ed7d1aba


TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ed7d1ab
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ed7d1ab
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ed7d1ab

Branch: refs/heads/master
Commit: 4ed7d1abafa9aab7636c1febcb1a63ea63fde9c8
Parents: c34c54f
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 18 21:01:36 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 18 21:01:36 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/api/TezConfiguration.java    | 13 ++++++
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  | 16 ++++---
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 10 ++++-
 .../dag/app/rm/node/PerSourceNodeTracker.java   |  8 +++-
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  | 47 ++++++++++++++++++++
 6 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ad2203..a3b0fa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators.
   TEZ-2949. Allow duplicate dag names within session for Tez.
   TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
+  TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
   TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
@@ -90,6 +91,7 @@ ALL CHANGES:
   TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
   TEZ-2887. Tez build failure due to missing dependency in pom files.
   TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper.
+  TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 
 Release 0.8.1-alpha: 2015-10-12

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index fabc256..b707857 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -569,6 +569,19 @@ public class TezConfiguration extends Configuration {
       + "node-blacklisting.ignore-threshold-node-percent";
   public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
 
+  /**
+   * Boolean value. Enable task rescheduling for node updates.
+   * When enabled the task scheduler will reschedule task attempts that
+   * are associated with an unhealthy node to avoid potential data transfer
+   * errors from downstream tasks.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS =
+      TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks";
+  public static final boolean
+    TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false;
+
   /** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="integer")

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 18d5978..bcc38c6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -59,6 +59,7 @@ public class AMNodeImpl implements AMNode {
   private final int maxTaskFailuresPerNode;
   private boolean blacklistingEnabled;
   private boolean ignoreBlacklisting = false;
+  private boolean nodeUpdatesRescheduleEnabled;
   private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
 
   @SuppressWarnings("rawtypes")
@@ -175,7 +176,7 @@ public class AMNodeImpl implements AMNode {
   @SuppressWarnings("rawtypes")
   public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode,
       EventHandler eventHandler, boolean blacklistingEnabled,
-      AppContext appContext) {
+      boolean rescheduleOnUnhealthyNode, AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -184,6 +185,7 @@ public class AMNodeImpl implements AMNode {
     this.appContext = appContext;
     this.eventHandler = eventHandler;
     this.blacklistingEnabled = blacklistingEnabled;
+    this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode;
     this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
     this.stateMachine = stateMachineFactory.make(this);
     // TODO Handle the case where a node is created due to the RM reporting it's
@@ -323,12 +325,14 @@ public class AMNodeImpl implements AMNode {
       SingleArcTransition<AMNodeImpl, AMNodeEvent> {
     @Override
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
-      for (ContainerId c : node.containers) {
-        node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+      if (node.nodeUpdatesRescheduleEnabled) {
+        for (ContainerId c : node.containers) {
+          node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+        }
+        // Resetting counters.
+        node.numFailedTAs = 0;
+        node.numSuccessfulTAs = 0;
       }
-      // Resetting counters.
-      node.numFailedTAs = 0;
-      node.numSuccessfulTAs = 0;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 1aa8472..fdc8a4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -50,6 +50,7 @@ public class AMNodeTracker extends AbstractService implements
   private int maxTaskFailuresPerNode;
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
+  private boolean nodeUpdatesRescheduleEnabled;
 
   @SuppressWarnings("rawtypes")
   public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
@@ -70,10 +71,14 @@ public class AMNodeTracker extends AbstractService implements
     this.blacklistDisablePercent = conf.getInt(
           TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
           TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+    this.nodeUpdatesRescheduleEnabled = conf.getBoolean(
+          TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+          TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT);
 
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
         ", blacklistingEnabled: " + nodeBlacklistingEnabled +
-        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode +
+        ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled);
 
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
       throw new TezUncheckedException("Invalid blacklistDisablePercent: "
@@ -143,7 +148,8 @@ public class AMNodeTracker extends AbstractService implements
     if (nodeTracker == null) {
       nodeTracker =
           new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode,
-              nodeBlacklistingEnabled, blacklistDisablePercent);
+              nodeBlacklistingEnabled, blacklistDisablePercent,
+              nodeUpdatesRescheduleEnabled);
       PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker);
       nodeTracker = old != null ? old : nodeTracker;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index b1c81af..72c3230 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -42,6 +42,7 @@ public class PerSourceNodeTracker {
   private final int maxTaskFailuresPerNode;
   private final boolean nodeBlacklistingEnabled;
   private final int blacklistDisablePercent;
+  private final boolean nodeUpdatesRescheduleEnabled;
 
   private int numClusterNodes;
   float currentIgnoreBlacklistingCountThreshold = 0;
@@ -50,7 +51,8 @@ public class PerSourceNodeTracker {
   @SuppressWarnings("rawtypes")
   public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext,
                               int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled,
-                              int blacklistDisablePercent) {
+                              int blacklistDisablePercent,
+                              boolean nodeUpdatesRescheduleEnabled) {
     this.sourceId = sourceId;
     this.nodeMap = new ConcurrentHashMap<>();
     this.blacklistMap = new ConcurrentHashMap<>();
@@ -60,13 +62,15 @@ public class PerSourceNodeTracker {
     this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
     this.nodeBlacklistingEnabled = nodeBlacklistingEnabled;
     this.blacklistDisablePercent = blacklistDisablePercent;
+    this.nodeUpdatesRescheduleEnabled = nodeUpdatesRescheduleEnabled;
   }
 
 
 
   public void nodeSeen(NodeId nodeId) {
     if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode,
-        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+        eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled,
+        appContext)) == null) {
       LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ed7d1ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 143fcbf..25d1784 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -326,6 +326,53 @@ public class TestAMNodeTracker {
     }
   }
 
+  @Test(timeout=10000)
+  public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
+    _testNodeUnhealthyRescheduleTasks(true);
+  }
+
+  @Test(timeout=10000)
+  public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception {
+    _testNodeUnhealthyRescheduleTasks(false);
+  }
+
+  private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+        rescheduleTasks);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+
+    // add a node
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    amNodeTracker.nodeSeen(nodeId, 0);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
+
+    // simulate task starting on node
+    ContainerId cid = mock(ContainerId.class);
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cid));
+
+    // mark node unhealthy
+    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
+    assertEquals(AMNodeState.UNHEALTHY, node.getState());
+
+    // check for task rescheduling events
+    if (rescheduleTasks) {
+      assertEquals(1, handler.events.size());
+      assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+    } else {
+      assertEquals(0, handler.events.size());
+    }
+
+    amNodeTracker.stop();
+  }
+
   private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker,
                                              TestEventHandler handler, int schedulerId) {
     amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId));