You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:28 UTC
[11/50] [abbrv] hadoop git commit: YARN-3464. Race condition in
LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu
via kasha)
YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47279c32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47279c32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47279c32
Branch: refs/heads/HDFS-7240
Commit: 47279c3228185548ed09c36579b420225e4894f5
Parents: 22b70e7
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sun Apr 26 09:13:46 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sun Apr 26 09:13:46 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../container/ContainerImpl.java | 8 ++-
.../localizer/ResourceLocalizationService.java | 53 +++++++++++++++-----
.../localizer/event/LocalizationEventType.java | 1 +
.../TestResourceLocalizationService.java | 12 ++++-
5 files changed, 62 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a626f82..87db291 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -268,6 +268,9 @@ Release 2.8.0 - UNRELEASED
YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore
invoked (Brahma Reddy Battula via jlowe)
+ YARN-3464. Race condition in LocalizerRunner kills localizer before
+ localizing all resources. (Zhihai Xu via kasha)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index c9874a6..68669aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -716,7 +718,12 @@ public class ContainerImpl implements Container {
return ContainerState.LOCALIZING;
}
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationEvent(LocalizationEventType.
+ CONTAINER_RESOURCES_LOCALIZED, container));
+
container.sendLaunchEvent();
+ container.metrics.endInitingContainer();
// If this is a recovered container that has already launched, skip
// uploading resources to the shared cache. We do this to avoid uploading
@@ -734,7 +741,6 @@ public class ContainerImpl implements Container {
SharedCacheUploadEventType.UPLOAD));
}
- container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 611fe80..cdd252c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -389,6 +391,9 @@ public class ResourceLocalizationService extends CompositeService
case INIT_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
+ case CONTAINER_RESOURCES_LOCALIZED:
+ handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
+ break;
case CACHE_CLEANUP:
handleCacheCleanup(event);
break;
@@ -455,7 +460,18 @@ public class ResourceLocalizationService extends CompositeService
}
}
}
-
+
+ /**
+ * Once a container's resources are localized, kill the corresponding
+ * {@link ContainerLocalizer}
+ */
+ private void handleContainerResourcesLocalized(
+ ContainerLocalizationEvent event) {
+ Container c = event.getContainer();
+ String locId = ConverterUtils.toString(c.getContainerId());
+ localizerTracker.endContainerLocalization(locId);
+ }
+
private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize);
@@ -670,7 +686,7 @@ public class ResourceLocalizationService extends CompositeService
response.setLocalizerAction(LocalizerAction.DIE);
return response;
}
- return localizer.update(status.getResources());
+ return localizer.processHeartbeat(status.getResources());
}
}
@@ -724,6 +740,17 @@ public class ResourceLocalizationService extends CompositeService
localizer.interrupt();
}
}
+
+ public void endContainerLocalization(String locId) {
+ LocalizerRunner localizer;
+ synchronized (privLocalizers) {
+ localizer = privLocalizers.get(locId);
+ if (null == localizer) {
+ return; // ignore
+ }
+ }
+ localizer.endContainerLocalization();
+ }
}
@@ -878,6 +905,7 @@ public class ResourceLocalizationService extends CompositeService
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
// Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending;
+ private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
// TODO: threadsafe, use outer?
private final RecordFactory recordFactory =
@@ -898,6 +926,10 @@ public class ResourceLocalizationService extends CompositeService
pending.add(request);
}
+ public void endContainerLocalization() {
+ killContainerLocalizer.set(true);
+ }
+
/**
* Find next resource to be given to a spawned localizer.
*
@@ -944,7 +976,7 @@ public class ResourceLocalizationService extends CompositeService
}
}
- LocalizerHeartbeatResponse update(
+ LocalizerHeartbeatResponse processHeartbeat(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@@ -953,7 +985,7 @@ public class ResourceLocalizationService extends CompositeService
ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId();
- LocalizerAction action = LocalizerAction.LIVE;
+ boolean fetchFailed = false;
// Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
@@ -989,7 +1021,7 @@ public class ResourceLocalizationService extends CompositeService
case FETCH_FAILURE:
final String diagnostics = stat.getException().toString();
LOG.warn(req + " failed: " + diagnostics);
- action = LocalizerAction.DIE;
+ fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, diagnostics));
@@ -1001,15 +1033,15 @@ public class ResourceLocalizationService extends CompositeService
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
- action = LocalizerAction.DIE;
+ fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage()));
break;
}
}
- if (action == LocalizerAction.DIE) {
- response.setLocalizerAction(action);
+ if (fetchFailed || killContainerLocalizer.get()) {
+ response.setLocalizerAction(LocalizerAction.DIE);
return response;
}
@@ -1037,12 +1069,9 @@ public class ResourceLocalizationService extends CompositeService
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
- } else if (pending.isEmpty()) {
- // TODO: Synchronization
- action = LocalizerAction.DIE;
}
- response.setLocalizerAction(action);
+ response.setLocalizerAction(LocalizerAction.LIVE);
response.setResourceSpecs(rsrcs);
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 5134349..4785fba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -23,4 +23,5 @@ public enum LocalizationEventType {
CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,
+ CONTAINER_RESOURCES_LOCALIZED,
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47279c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index d3c3521..2edaf45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -975,7 +976,8 @@ public class TestResourceLocalizationService {
.thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrc1success))
.thenReturn(Collections.singletonList(rsrc2pending))
- .thenReturn(rsrcs4);
+ .thenReturn(rsrcs4)
+ .thenReturn(Collections.<LocalResourceStatus>emptyList());
String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR +
@@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
- // get shutdown
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+ spyService.handle(new ContainerLocalizationEvent(
+ LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
+
+ // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());