You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/26 03:15:09 UTC
[1/2] incubator-apex-core git commit: APEX-92 #Comment #resolve Fix
for adding failed nodes to blacklist,
after failure is observed for the same node more than MAX number of tries
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 e6263b5cf -> 0a85586df
APEX-92 #Comment #resolve
Fix for adding failed nodes to blacklist, after failure is observed for the same node more than MAX number of tries
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d8e1e74d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d8e1e74d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d8e1e74d
Branch: refs/heads/devel-3
Commit: d8e1e74da00619a3c0e021ecd167de57fcc0e262
Parents: 90bda5e
Author: ishark <is...@datatorrent.com>
Authored: Wed Sep 23 16:17:04 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Sep 25 16:09:54 2015 -0700
----------------------------------------------------------------------
engine/pom.xml | 2 +-
.../stram/StreamingAppMasterService.java | 83 ++++++++++++++------
2 files changed, 62 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 32f2001..15e0565 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>2248</maxAllowedViolations>
+ <maxAllowedViolations>2238</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8e1e74d/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index bfeedbd..5d84e10 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -32,9 +32,6 @@ import javax.xml.bind.annotation.XmlElement;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.conf.Configuration;
@@ -63,13 +60,15 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StringCodec;
-
+import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
@@ -103,11 +102,13 @@ public class StreamingAppMasterService extends CompositeService
private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2;
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000;
private static final int NUMBER_MISSED_HEARTBEATS = 30;
+ private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
+ private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000;
private AMRMClient<ContainerRequest> amRmClient;
private NMClientAsync nmClient;
private LogicalPlan dag;
// Application Attempt Id ( combination of attemptId and fail count )
- final private ApplicationAttemptId appAttemptID;
+ private final ApplicationAttemptId appAttemptID;
// Hostname of the container
private final String appMasterHostname = "";
// Tracking url to which app master publishes info for clients to monitor
@@ -118,6 +119,8 @@ public class StreamingAppMasterService extends CompositeService
private final AtomicInteger numCompletedContainers = new AtomicInteger();
// Containers that the RM has allocated to us
private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap();
+ private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap();
+ private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>();
// Count of failed containers
private final AtomicInteger numFailedContainers = new AtomicInteger();
private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>();
@@ -279,8 +282,7 @@ public class StreamingAppMasterService extends CompositeService
if (c.getExternalId() == null || c.getState() == PTContainer.State.KILLED) {
if (c.getRequiredVCores() == 0) {
result++;
- }
- else {
+ } else {
result += c.getRequiredVCores();
}
}
@@ -475,15 +477,12 @@ public class StreamingAppMasterService extends CompositeService
LOG.info("System CWD content: " + line);
}
LOG.info("Dumping files in local dir: end");
- }
- finally {
+ } finally {
buf.close();
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.debug("Exception", e);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.info("Interrupted", e);
}
@@ -492,11 +491,9 @@ public class StreamingAppMasterService extends CompositeService
try {
// find a better way of logging this using the logger.
Configuration.dumpConfiguration(getConfig(), new PrintWriter(System.out));
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error dumping configuration.", e);
}
-
}
@Override
@@ -507,8 +504,7 @@ public class StreamingAppMasterService extends CompositeService
FileInputStream fis = new FileInputStream("./" + LogicalPlan.SER_FILE_NAME);
try {
this.dag = LogicalPlan.read(fis);
- }
- finally {
+ } finally {
fis.close();
}
// "debug" simply dumps all data using LOG.info
@@ -651,6 +647,9 @@ public class StreamingAppMasterService extends CompositeService
int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores);
+ int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE);
+ long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME);
+
// for locality relaxation fall back
Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap();
@@ -692,11 +691,9 @@ public class StreamingAppMasterService extends CompositeService
return;
}
resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
- }
- finally {
+ } finally {
clientRMService.stop();
}
@@ -781,6 +778,23 @@ public class StreamingAppMasterService extends CompositeService
}
}
+ /* Remove nodes from blacklist after timeout */
+ long currentTime = System.currentTimeMillis();
+ List<String> blacklistRemovals = new ArrayList<String>();
+ for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
+ Pair<Long, List<String>> entry = it.next();
+ Long timeDiff = currentTime - entry.getFirst();
+ if (timeDiff > blacklistRemovalTime) {
+ blacklistRemovals.addAll(entry.getSecond());
+ it.remove();
+ } else {
+ break;
+ }
+ }
+ if (!blacklistRemovals.isEmpty()) {
+ amRmClient.updateBlacklist(null, blacklistRemovals);
+ }
+
numTotalContainers += containerRequests.size();
numRequestedContainers += containerRequests.size();
AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
@@ -868,6 +882,7 @@ public class StreamingAppMasterService extends CompositeService
// Check the completed containers
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
// LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ List<String> blacklistAdditions = new ArrayList<String>();
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
@@ -883,6 +898,20 @@ public class StreamingAppMasterService extends CompositeService
if (0 != exitStatus) {
if (allocatedContainer != null) {
numFailedContainers.incrementAndGet();
+ if (exitStatus != 1) {
+ // If container failure due to framework
+ String hostname = allocatedContainer.container.getNodeId().getHost();
+ int failedTimes = 1;
+ AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1));
+ if (failed != null) {
+ failedTimes = failed.incrementAndGet();
+ }
+ if (failedTimes >= maxConsecutiveContainerFailures) {
+ // Blacklist the node
+ LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes);
+ blacklistAdditions.add(hostname);
+ }
+ }
}
// if (exitStatus == 1) {
// // non-recoverable StreamingContainer failure
@@ -902,6 +931,12 @@ public class StreamingAppMasterService extends CompositeService
// container completed successfully
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+ // Reset counter for node failure, if exists
+ String hostname = allocatedContainer.container.getNodeId().getHost();
+ AtomicInteger failedTimes = failedContainersMap.get(hostname);
+ if(failedTimes != null) {
+ failedTimes.set(0);
+ }
}
String containerIdStr = containerStatus.getContainerId().toString();
@@ -913,6 +948,10 @@ public class StreamingAppMasterService extends CompositeService
dnmgr.recordEventAsync(ev);
}
+ if (!blacklistAdditions.isEmpty()) {
+ amRmClient.updateBlacklist(blacklistAdditions, null);
+ blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions));
+ }
if (dnmgr.forcedShutdown) {
LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
finalStatus = FinalApplicationStatus.FAILED;
[2/2] incubator-apex-core git commit: Merge branch 'APEX-92' of
https://github.com/ishark/incubator-apex-core into devel-3
Posted by vr...@apache.org.
Merge branch 'APEX-92' of https://github.com/ishark/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0a85586d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0a85586d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0a85586d
Branch: refs/heads/devel-3
Commit: 0a85586dfd194a0b77cb438e68a54c16e99d2cf7
Parents: e6263b5 d8e1e74
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Sep 25 18:08:10 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Sep 25 18:08:10 2015 -0700
----------------------------------------------------------------------
engine/pom.xml | 2 +-
.../stram/StreamingAppMasterService.java | 83 ++++++++++++++------
2 files changed, 62 insertions(+), 23 deletions(-)
----------------------------------------------------------------------