You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/26 06:11:48 UTC

[30/34] incubator-apex-core git commit: APEX-92 #Comment #resolve Fix for adding failed nodes to blacklist, after failure is observed for the same node more than MAX number of tries

APEX-92 #Comment #resolve
Fix for adding failed nodes to blacklist, after failure is observed for the same node more than MAX number of tries


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d8e1e74d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d8e1e74d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d8e1e74d

Branch: refs/heads/feature-module
Commit: d8e1e74da00619a3c0e021ecd167de57fcc0e262
Parents: 90bda5e
Author: ishark <is...@datatorrent.com>
Authored: Wed Sep 23 16:17:04 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 25 16:09:54 2015 -0700

----------------------------------------------------------------------
 engine/pom.xml                                  |  2 +-
 .../stram/StreamingAppMasterService.java        | 83 ++++++++++++++------
 2 files changed, 62 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 32f2001..15e0565 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>2248</maxAllowedViolations>
+          <maxAllowedViolations>2238</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index bfeedbd..5d84e10 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -32,9 +32,6 @@ import javax.xml.bind.annotation.XmlElement;
 import com.google.common.collect.Maps;
 
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.hadoop.conf.Configuration;
@@ -63,13 +60,15 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StringCodec;
-
+import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
 import com.datatorrent.stram.api.AppDataSource;
 import com.datatorrent.stram.api.BaseContext;
@@ -103,11 +102,13 @@ public class StreamingAppMasterService extends CompositeService
   private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2;
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000;
   private static final int NUMBER_MISSED_HEARTBEATS = 30;
+  private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
+  private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000;
   private AMRMClient<ContainerRequest> amRmClient;
   private NMClientAsync nmClient;
   private LogicalPlan dag;
   // Application Attempt Id ( combination of attemptId and fail count )
-  final private ApplicationAttemptId appAttemptID;
+  private final ApplicationAttemptId appAttemptID;
   // Hostname of the container
   private final String appMasterHostname = "";
   // Tracking url to which app master publishes info for clients to monitor
@@ -118,6 +119,8 @@ public class StreamingAppMasterService extends CompositeService
   private final AtomicInteger numCompletedContainers = new AtomicInteger();
   // Containers that the RM has allocated to us
   private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap();
+  private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>();
   // Count of failed containers
   private final AtomicInteger numFailedContainers = new AtomicInteger();
   private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>();
@@ -279,8 +282,7 @@ public class StreamingAppMasterService extends CompositeService
         if (c.getExternalId() == null || c.getState() == PTContainer.State.KILLED) {
           if (c.getRequiredVCores() == 0) {
             result++;
-          }
-          else {
+          } else {
             result += c.getRequiredVCores();
           }
         }
@@ -475,15 +477,12 @@ public class StreamingAppMasterService extends CompositeService
           LOG.info("System CWD content: " + line);
         }
         LOG.info("Dumping files in local dir: end");
-      }
-      finally {
+      } finally {
         buf.close();
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       LOG.debug("Exception", e);
-    }
-    catch (InterruptedException e) {
+    } catch (InterruptedException e) {
       LOG.info("Interrupted", e);
     }
 
@@ -492,11 +491,9 @@ public class StreamingAppMasterService extends CompositeService
     try {
       // find a better way of logging this using the logger.
       Configuration.dumpConfiguration(getConfig(), new PrintWriter(System.out));
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.error("Error dumping configuration.", e);
     }
-
   }
 
   @Override
@@ -507,8 +504,7 @@ public class StreamingAppMasterService extends CompositeService
     FileInputStream fis = new FileInputStream("./" + LogicalPlan.SER_FILE_NAME);
     try {
       this.dag = LogicalPlan.read(fis);
-    }
-    finally {
+    } finally {
       fis.close();
     }
     // "debug" simply dumps all data using LOG.info
@@ -651,6 +647,9 @@ public class StreamingAppMasterService extends CompositeService
     int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
     LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores);
 
+    int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE);
+    long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME);
+
     // for locality relaxation fall back
     Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap();
 
@@ -692,11 +691,9 @@ public class StreamingAppMasterService extends CompositeService
         return;
       }
       resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
-    }
-    finally {
+    } finally {
       clientRMService.stop();
     }
 
@@ -781,6 +778,23 @@ public class StreamingAppMasterService extends CompositeService
         }
       }
 
+     /* Remove nodes from blacklist after timeout */
+      long currentTime = System.currentTimeMillis();
+      List<String> blacklistRemovals = new ArrayList<String>();
+      for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
+        Pair<Long, List<String>> entry = it.next();
+        Long timeDiff = currentTime - entry.getFirst();
+        if (timeDiff > blacklistRemovalTime) {
+          blacklistRemovals.addAll(entry.getSecond());
+          it.remove();
+        } else {
+          break;
+        }
+      }
+      if (!blacklistRemovals.isEmpty()) {
+        amRmClient.updateBlacklist(null, blacklistRemovals);
+      }
+
       numTotalContainers += containerRequests.size();
       numRequestedContainers += containerRequests.size();
       AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
@@ -868,6 +882,7 @@ public class StreamingAppMasterService extends CompositeService
       // Check the completed containers
       List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
       // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+      List<String> blacklistAdditions = new ArrayList<String>();
       for (ContainerStatus containerStatus : completedContainers) {
         LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
 
@@ -883,6 +898,20 @@ public class StreamingAppMasterService extends CompositeService
         if (0 != exitStatus) {
           if (allocatedContainer != null) {
             numFailedContainers.incrementAndGet();
+            if (exitStatus != 1) {
+              // If container failure due to framework
+              String hostname = allocatedContainer.container.getNodeId().getHost();
+              int failedTimes = 1;
+              AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1));
+              if (failed != null) {
+                failedTimes = failed.incrementAndGet();
+              }
+              if (failedTimes >= maxConsecutiveContainerFailures) {
+                // Blacklist the node
+                LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes);
+                blacklistAdditions.add(hostname);
+              }
+            }
           }
 //          if (exitStatus == 1) {
 //            // non-recoverable StreamingContainer failure
@@ -902,6 +931,12 @@ public class StreamingAppMasterService extends CompositeService
           // container completed successfully
           numCompletedContainers.incrementAndGet();
           LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+          // Reset counter for node failure, if exists
+          String hostname = allocatedContainer.container.getNodeId().getHost();
+          AtomicInteger failedTimes = failedContainersMap.get(hostname);
+          if(failedTimes != null) {
+            failedTimes.set(0);
+          }
         }
 
         String containerIdStr = containerStatus.getContainerId().toString();
@@ -913,6 +948,10 @@ public class StreamingAppMasterService extends CompositeService
         dnmgr.recordEventAsync(ev);
       }
 
+      if (!blacklistAdditions.isEmpty()) {
+        amRmClient.updateBlacklist(blacklistAdditions, null);
+        blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions));
+      }
       if (dnmgr.forcedShutdown) {
         LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
         finalStatus = FinalApplicationStatus.FAILED;