You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/23 02:37:34 UTC

incubator-apex-core git commit: APEXCORE-393 #resolve Adding Dag context attributes with increased default value for blacklisting of failed nodes Added resetting of failure count for nodes after blacklist removal interval. Cleaned up code to remove concu

Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 854966439 -> 0f1e2bb03


APEXCORE-393 #resolve Adding Dag context attributes with increased default value for blacklisting of failed nodes
Added resetting of failure count for nodes after blacklist removal interval.
Cleaned up code to remove concurrent Map for maintaining failed counts.

Reduced checkstyle violation count in engine pom.xml.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0f1e2bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0f1e2bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0f1e2bb0

Branch: refs/heads/master
Commit: 0f1e2bb038fbf66955990e9ff8772bef4ac2d874
Parents: 8549664
Author: ishark <is...@datatorrent.com>
Authored: Wed Mar 16 17:39:19 2016 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Tue Mar 22 18:28:14 2016 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  | 16 ++++
 engine/pom.xml                                  |  2 +-
 .../stram/StreamingAppMasterService.java        | 90 +++++++++++++-------
 3 files changed, 76 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index d34d682..ee90100 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -495,6 +495,22 @@ public interface Context
      * Only supports string codecs that have a constructor with no arguments
      */
     Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>> STRING_CODECS = new Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>(new Map2String<Class<?>, Class<? extends StringCodec<?>>>(",", "=", new Class2String<Object>(), new Class2String<StringCodec<?>>()));
+
+    /**
+     * The number of consecutive container failures that should lead to
+     * blacklisting of nodes by application master
+     * Blacklisting for nodes is disabled for the default value
+     */
+    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+
+    /**
+     * The amount of time to wait before removing failed nodes from blacklist
+     */
+    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+
+    /**
+     * The number of times consecutive container failure
+     */
     @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index f13c2f2..f968686 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -149,7 +149,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>3173</maxAllowedViolations>
+          <maxAllowedViolations>3161</maxAllowedViolations>
           <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0f1e2bb0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 31a7fc8..fbc0b36 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -31,7 +31,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -82,13 +82,13 @@ import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StringCodec;
-import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
 import com.datatorrent.stram.api.AppDataSource;
 import com.datatorrent.stram.api.BaseContext;
@@ -123,8 +123,6 @@ public class StreamingAppMasterService extends CompositeService
   private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2;
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000;
   private static final int NUMBER_MISSED_HEARTBEATS = 30;
-  private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
-  private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000;
   private AMRMClient<ContainerRequest> amRmClient;
   private NMClientAsync nmClient;
   private LogicalPlan dag;
@@ -140,8 +138,10 @@ public class StreamingAppMasterService extends CompositeService
   private final AtomicInteger numCompletedContainers = new AtomicInteger();
   // Containers that the RM has allocated to us
   private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap();
-  private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap();
-  private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>();
+  // Set of nodes marked blacklisted due to consecutive container failures on the nodes
+  private final Set<String> failedBlackListedNodes = Sets.newHashSet();
+  // Maintains max consecutive failures stats for nodes for blacklisting failing nodes
+  private final Map<String, NodeFailureStats> failedContainerNodesMap = Maps.newHashMap();
   // Count of failed containers
   private final AtomicInteger numFailedContainers = new AtomicInteger();
   private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>();
@@ -161,6 +161,19 @@ public class StreamingAppMasterService extends CompositeService
     this.appAttemptID = appAttemptID;
   }
 
+  private class NodeFailureStats
+  {
+    long lastFailureTimeStamp;
+    int failureCount;
+    long blackListAdditionTime;
+
+    public NodeFailureStats(long lastFailureTimeStamp, int failureCount)
+    {
+      this.lastFailureTimeStamp = lastFailureTimeStamp;
+      this.failureCount = failureCount;
+    }
+  }
+
   /**
    * Overrides getters to pull live info.
    */
@@ -671,9 +684,9 @@ public class StreamingAppMasterService extends CompositeService
     int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
     LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores);
 
-    int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE);
-    long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME);
-
+    long blacklistRemovalTime = dag.getValue(DAGContext.BLACKLISTED_NODE_REMOVAL_TIME_MILLIS);
+    int maxConsecutiveContainerFailures = dag.getValue(DAGContext.MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST);
+    LOG.info("Blacklist removal time in millis = {}, max consecutive node failure count = {}", blacklistRemovalTime, maxConsecutiveContainerFailures);
     // for locality relaxation fall back
     Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap();
 
@@ -805,18 +818,17 @@ public class StreamingAppMasterService extends CompositeService
      /* Remove nodes from blacklist after timeout */
       long currentTime = System.currentTimeMillis();
       List<String> blacklistRemovals = new ArrayList<String>();
-      for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
-        Pair<Long, List<String>> entry = it.next();
-        Long timeDiff = currentTime - entry.getFirst();
-        if (timeDiff > blacklistRemovalTime) {
-          blacklistRemovals.addAll(entry.getSecond());
-          it.remove();
-        } else {
-          break;
+      for (String hostname : failedBlackListedNodes) {
+        Long timeDiff = currentTime - failedContainerNodesMap.get(hostname).blackListAdditionTime;
+        if (timeDiff >= blacklistRemovalTime) {
+          blacklistRemovals.add(hostname);
+          failedContainerNodesMap.remove(hostname);
         }
       }
       if (!blacklistRemovals.isEmpty()) {
         amRmClient.updateBlacklist(null, blacklistRemovals);
+        LOG.info("Removing nodes {} from blacklist: time elapsed since last blacklisting due to failure is greater than specified timeout", blacklistRemovals.toString());
+        failedBlackListedNodes.removeAll(blacklistRemovals);
       }
 
       numTotalContainers += containerRequests.size();
@@ -922,18 +934,30 @@ public class StreamingAppMasterService extends CompositeService
         if (0 != exitStatus) {
           if (allocatedContainer != null) {
             numFailedContainers.incrementAndGet();
-            if (exitStatus != 1) {
+            if (exitStatus != 1 && maxConsecutiveContainerFailures != Integer.MAX_VALUE) {
               // If container failure due to framework
               String hostname = allocatedContainer.container.getNodeId().getHost();
-              int failedTimes = 1;
-              AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1));
-              if (failed != null) {
-                failedTimes = failed.incrementAndGet();
-              }
-              if (failedTimes >= maxConsecutiveContainerFailures) {
-                // Blacklist the node
-                LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes);
-                blacklistAdditions.add(hostname);
+              if (!failedBlackListedNodes.contains(hostname)) {
+                // Blacklist the node if not already blacklisted
+                if (failedContainerNodesMap.containsKey(hostname)) {
+                  NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+                  long timeStamp = System.currentTimeMillis();
+                  if (timeStamp - stats.lastFailureTimeStamp  >= blacklistRemovalTime) {
+                    // Reset failure count if last failure was before Blacklist removal time
+                    stats.failureCount = 1;
+                    stats.lastFailureTimeStamp = timeStamp;
+                  } else {
+                    stats.lastFailureTimeStamp = timeStamp;
+                    stats.failureCount++;
+                    if (stats.failureCount >= maxConsecutiveContainerFailures) {
+                      LOG.info("Node {} failed {} times consecutively within {} minutes, marking the node blacklisted", hostname, stats.failureCount, blacklistRemovalTime / (60 * 1000));
+                      blacklistAdditions.add(hostname);
+                      failedBlackListedNodes.add(hostname);
+                    }
+                  }
+                } else {
+                  failedContainerNodesMap.put(hostname, new NodeFailureStats(System.currentTimeMillis(), 1));
+                }
               }
             }
           }
@@ -957,9 +981,9 @@ public class StreamingAppMasterService extends CompositeService
           LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
           // Reset counter for node failure, if exists
           String hostname = allocatedContainer.container.getNodeId().getHost();
-          AtomicInteger failedTimes = failedContainersMap.get(hostname);
-          if(failedTimes != null) {
-            failedTimes.set(0);
+          NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+          if (stats != null) {
+            stats.failureCount = 0;
           }
         }
 
@@ -974,7 +998,11 @@ public class StreamingAppMasterService extends CompositeService
 
       if (!blacklistAdditions.isEmpty()) {
         amRmClient.updateBlacklist(blacklistAdditions, null);
-        blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions));
+        long timeStamp = System.currentTimeMillis();
+        for (String hostname : blacklistAdditions) {
+          NodeFailureStats stats = failedContainerNodesMap.get(hostname);
+          stats.blackListAdditionTime = timeStamp;
+        }
       }
       if (dnmgr.forcedShutdown) {
         LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);