You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/01/05 20:32:11 UTC

tez git commit: TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 9880c414c -> d80e30d3a


TEZ-2972. Avoid task rescheduling when a node turns unhealthy (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d80e30d3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d80e30d3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d80e30d3

Branch: refs/heads/branch-0.7
Commit: d80e30d3af44471c5da98e5e9a1dd08929dc3931
Parents: 9880c41
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jan 5 19:31:35 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jan 5 19:31:35 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/api/TezConfiguration.java    | 12 +++++
 tez-dag/findbugs-exclude.xml                    |  1 +
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  | 16 ++++---
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 14 ++++--
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  | 47 ++++++++++++++++++++
 6 files changed, 82 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f8897e..c2364a7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,8 +4,10 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
   TEZ-2949. Allow duplicate dag names within session for Tez.
+  TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-2972. Avoid task rescheduling when a node turns unhealthy
   TEZ-3017. HistoryACLManager does not have a close method for cleanup
   TEZ-2914. Ability to limit vertex concurrency
   TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex

http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8997327..d4d5759 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -541,6 +541,18 @@ public class TezConfiguration extends Configuration {
       + "node-blacklisting.ignore-threshold-node-percent";
   public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
 
+  /**
+   * Boolean value. Enable task rescheduling for node updates.
+   * When enabled the task scheduler will reschedule task attempts that
+   * are associated with an unhealthy node to avoid potential data transfer
+   * errors from downstream tasks.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS =
+      TEZ_AM_PREFIX + "node-unhealthy-reschedule-tasks";
+  public static final boolean
+    TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT = false;
+
   /** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
   @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CLIENT_THREAD_COUNT =

http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 4c01edc..2842a50 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -18,6 +18,7 @@
     <Or>
       <Field name="blacklistDisablePercent" />
       <Field name="maxTaskFailuresPerNode" />
+      <Field name="nodeUpdatesRescheduleEnabled" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>

http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index b93cab3..f4d89e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -58,6 +58,7 @@ public class AMNodeImpl implements AMNode {
   private final int maxTaskFailuresPerNode;
   private boolean blacklistingEnabled;
   private boolean ignoreBlacklisting = false;
+  private boolean nodeUpdatesRescheduleEnabled;
   private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
 
   @SuppressWarnings("rawtypes")
@@ -174,7 +175,7 @@ public class AMNodeImpl implements AMNode {
   @SuppressWarnings("rawtypes")
   public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
       EventHandler eventHandler, boolean blacklistingEnabled,
-      AppContext appContext) {
+      boolean rescheduleOnUnhealthyNode, AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -182,6 +183,7 @@ public class AMNodeImpl implements AMNode {
     this.appContext = appContext;
     this.eventHandler = eventHandler;
     this.blacklistingEnabled = blacklistingEnabled;
+    this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode;
     this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
     this.stateMachine = stateMachineFactory.make(this);
     // TODO Handle the case where a node is created due to the RM reporting it's
@@ -321,12 +323,14 @@ public class AMNodeImpl implements AMNode {
       SingleArcTransition<AMNodeImpl, AMNodeEvent> {
     @Override
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
-      for (ContainerId c : node.containers) {
-        node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+      if (node.nodeUpdatesRescheduleEnabled) {
+        for (ContainerId c : node.containers) {
+          node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+        }
+        // Resetting counters.
+        node.numFailedTAs = 0;
+        node.numSuccessfulTAs = 0;
       }
-      // Resetting counters.
-      node.numFailedTAs = 0;
-      node.numSuccessfulTAs = 0;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index a067cee..38c154b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -53,7 +53,8 @@ public class AMNodeTracker extends AbstractService implements
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   float currentIgnoreBlacklistingCountThreshold = 0;
-  
+  private boolean nodeUpdatesRescheduleEnabled;
+
   @SuppressWarnings("rawtypes")
   public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
     super("AMNodeMap");
@@ -74,10 +75,14 @@ public class AMNodeTracker extends AbstractService implements
     this.blacklistDisablePercent = conf.getInt(
           TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
           TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+    this.nodeUpdatesRescheduleEnabled = conf.getBoolean(
+          TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+          TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT);
 
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
-        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
-        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+        ", blacklistingEnabled: " + nodeBlacklistingEnabled +
+        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode +
+        ", nodeUpdatesRescheduleEnabled: " + nodeUpdatesRescheduleEnabled);
 
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
       throw new TezUncheckedException("Invalid blacklistDisablePercent: "
@@ -88,7 +93,8 @@ public class AMNodeTracker extends AbstractService implements
   
   public void nodeSeen(NodeId nodeId) {
     if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
-        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+        eventHandler, nodeBlacklistingEnabled, nodeUpdatesRescheduleEnabled,
+        appContext)) == null) {
       LOG.info("Adding new node: " + nodeId);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d80e30d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index d907ea0..0072f6a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -321,6 +321,53 @@ public class TestAMNodeTracker {
     amNodeTracker.stop();
   }
 
+  @Test(timeout=10000)
+  public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
+    _testNodeUnhealthyRescheduleTasks(true);
+  }
+
+  @Test(timeout=10000)
+  public void testNodeUnhealthyRescheduleTasksDisabled() throws Exception {
+    _testNodeUnhealthyRescheduleTasks(false);
+  }
+
+  private void _testNodeUnhealthyRescheduleTasks(boolean rescheduleTasks) {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
+        rescheduleTasks);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+
+    // add a node
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1));
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    amNodeTracker.nodeSeen(nodeId);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+
+    // simulate task starting on node
+    ContainerId cid = mock(ContainerId.class);
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cid));
+
+    // mark node unhealthy
+    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+    assertEquals(AMNodeState.UNHEALTHY, node.getState());
+
+    // check for task rescheduling events
+    if (rescheduleTasks) {
+      assertEquals(1, handler.events.size());
+      assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+    } else {
+      assertEquals(0, handler.events.size());
+    }
+
+    amNodeTracker.stop();
+  }
+
   private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
     NodeReport nodeReport = mock(NodeReport.class);
     doReturn(nodeId).when(nodeReport).getNodeId();