You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/11 04:08:12 UTC
svn commit: r1466756 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yar...
Author: vinodkv
Date: Thu Apr 11 02:08:11 2013
New Revision: 1466756
URL: http://svn.apache.org/r1466756
Log:
YARN-539. Addressed memory leak of LocalResource objects NM when a resource localization fails. Contributed by Omkar Vinit Joshi.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Apr 11 02:08:11 2013
@@ -220,6 +220,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-534. Change RM restart recovery to also account for AM max-attempts
configuration after the restart. (Jian He via vinodkv)
+ YARN-539. Addressed memory leak of LocalResource objects NM when a resource
+ localization fails. (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Thu Apr 11 02:08:11 2013
@@ -270,4 +270,11 @@
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
+ <!-- This type cast problem will never occur. -->
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
+ <Method name="handle" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
+
</FindBugsFilter>
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Thu Apr 11 02:08:11 2013
@@ -40,8 +40,5 @@ interface LocalResourcesTracker
String getUser();
- // TODO: Remove this in favour of EventHandler.handle
- void localizationCompleted(LocalResourceRequest req, boolean success);
-
long nextUniqueNumber();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Thu Apr 11 02:08:11 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
/**
@@ -96,13 +97,22 @@ class LocalResourcesTrackerImpl implemen
this.conf = conf;
}
+ /*
+ * Synchronizing this method for avoiding races due to multiple ResourceEvent's
+ * coming to LocalResourcesTracker from Public/Private localizer and
+ * Resource Localization Service.
+ */
@Override
- public void handle(ResourceEvent event) {
+ public synchronized void handle(ResourceEvent event) {
LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
- case REQUEST:
case LOCALIZED:
+ if (useLocalCacheDirectoryManager) {
+ inProgressLocalResourcesMap.remove(req);
+ }
+ break;
+ case REQUEST:
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
@@ -117,10 +127,24 @@ class LocalResourcesTrackerImpl implemen
break;
case RELEASE:
if (null == rsrc) {
- LOG.info("Release unknown rsrc null (discard)");
+ // The container sent a release event on a resource which
+ // 1) Failed
+ // 2) Removed for some reason (ex. disk is no longer accessible)
+ ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+ LOG.info("Container " + relEvent.getContainer()
+ + " sent RELEASE event on a resource request " + req
+ + " not present in cache.");
return;
}
break;
+ case LOCALIZATION_FAILED:
+ decrementFileCountForLocalCacheDirectory(req, null);
+ /*
+ * If resource localization fails then Localized resource will be
+ * removed from local cache.
+ */
+ localrsrc.remove(req);
+ break;
}
rsrc.handle(event);
}
@@ -280,18 +304,6 @@ class LocalResourcesTrackerImpl implemen
}
@Override
- public void localizationCompleted(LocalResourceRequest req,
- boolean success) {
- if (useLocalCacheDirectoryManager) {
- if (!success) {
- decrementFileCountForLocalCacheDirectory(req, null);
- } else {
- inProgressLocalResourcesMap.remove(req);
- }
- }
- }
-
- @Override
public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Thu Apr 11 02:08:11 2013
@@ -32,10 +32,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -89,6 +91,8 @@ public class LocalizedResource implement
.addTransition(ResourceState.DOWNLOADING,
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
ResourceEventType.RELEASE, new ReleasePendingTransition())
+ .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
+ ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
// From LOCALIZED (ref >= 0, on disk)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
@@ -126,12 +130,14 @@ public class LocalizedResource implement
}
private void release(ContainerId container) {
- if (!ref.remove(container)) {
- LOG.info("Attempt to release claim on " + this +
- " from unregistered container " + container);
- assert false; // TODO: FIX
+ if (ref.remove(container)) {
+ // updating the timestamp only in case of success.
+ timestamp.set(currentTime());
+ } else {
+ LOG.info("Container " + container
+ + " doesn't exist in the container list of the Resource " + this
+ + " to which it sent RELEASE event");
}
- timestamp.set(currentTime());
}
private long currentTime() {
@@ -251,6 +257,25 @@ public class LocalizedResource implement
}
/**
+ * Resource localization failed, notify waiting containers.
+ */
+ @SuppressWarnings("unchecked")
+ private static class FetchFailedTransition extends ResourceTransition {
+ @Override
+ public void transition(LocalizedResource rsrc, ResourceEvent event) {
+ ResourceFailedLocalizationEvent failedEvent =
+ (ResourceFailedLocalizationEvent) event;
+ Queue<ContainerId> containers = rsrc.ref;
+ Throwable failureCause = failedEvent.getCause();
+ for (ContainerId container : containers) {
+ rsrc.dispatcher.getEventHandler().handle(
+ new ContainerResourceFailedEvent(container, failedEvent
+ .getLocalResourceRequest(), failureCause));
+ }
+ }
+ }
+
+ /**
* Resource already localized, notify immediately.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Apr 11 02:08:11 2013
@@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -683,7 +683,6 @@ public class ResourceLocalizationService
}
@Override
- @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
// TODO shutdown, better error handling esp. DU
@@ -699,10 +698,8 @@ public class ResourceLocalizationService
return;
}
LocalResourceRequest key = assoc.getResource().getRequest();
- assoc.getResource().handle(
- new ResourceLocalizedEvent(key,
- local, FileUtil.getDU(new File(local.toUri()))));
- publicRsrc.localizationCompleted(key, true);
+ publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
+ .getDU(new File(local.toUri()))));
synchronized (attempts) {
attempts.remove(key);
}
@@ -710,13 +707,10 @@ public class ResourceLocalizationService
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest();
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(
- assoc.getContext().getContainerId(),
- req, e.getCause()));
- publicRsrc.localizationCompleted(req, false);
- List<LocalizerResourceRequestEvent> reqs;
+ publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
+ .getCause()));
synchronized (attempts) {
+ List<LocalizerResourceRequestEvent> reqs;
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -724,13 +718,6 @@ public class ResourceLocalizationService
}
attempts.remove(req);
}
- // let the other containers know about the localization failure
- for (LocalizerResourceRequestEvent reqEvent : reqs) {
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(
- reqEvent.getContext().getContainerId(),
- reqEvent.getResource().getRequest(), e.getCause()));
- }
} catch (CancellationException e) {
// ignore; shutting down
}
@@ -810,13 +797,14 @@ public class ResourceLocalizationService
return null;
}
- // TODO this sucks. Fix it later
- @SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
+ String user = context.getUser();
+ ApplicationId applicationId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
// The localizer has just spawned. Start giving it resources for
// remote-fetching.
if (remoteResourceStatuses.isEmpty()) {
@@ -847,6 +835,11 @@ public class ResourceLocalizationService
}
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
+ /*
+ * TODO : It doesn't support multiple downloads per ContainerLocalizer
+ * at the same time. We need to think whether we should support this.
+ */
+
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
@@ -865,11 +858,10 @@ public class ResourceLocalizationService
case FETCH_SUCCESS:
// notify resource
try {
- assoc.getResource().handle(
- new ResourceLocalizedEvent(req,
- ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
- stat.getLocalSize()));
- localizationCompleted(stat);
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceLocalizedEvent(req, ConverterUtils
+ .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
// TODO: Synchronization
@@ -899,19 +891,16 @@ public class ResourceLocalizationService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
- localizationCompleted(stat);
- // TODO: Why is this event going directly to the container. Why not
- // the resource itself? What happens to the resource? Is it removed?
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(context.getContainerId(),
- req, stat.getException()));
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceFailedLocalizationEvent(req, stat.getException()));
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
response.setLocalizerAction(LocalizerAction.DIE);
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(context.getContainerId(),
- req, stat.getException()));
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceFailedLocalizationEvent(req, stat.getException()));
break;
}
}
@@ -919,27 +908,6 @@ public class ResourceLocalizationService
return response;
}
- private void localizationCompleted(LocalResourceStatus stat) {
- try {
- LocalResource rsrc = stat.getResource();
- LocalResourceRequest key = new LocalResourceRequest(rsrc);
- String user = context.getUser();
- ApplicationId appId =
- context.getContainerId().getApplicationAttemptId()
- .getApplicationId();
- LocalResourceVisibility vis = rsrc.getVisibility();
- LocalResourcesTracker tracker =
- getLocalResourcesTracker(vis, user, appId);
- if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
- tracker.localizationCompleted(key, true);
- } else {
- tracker.localizationCompleted(key, false);
- }
- } catch (URISyntaxException e) {
- LOG.error("Invalid resource URL specified", e);
- }
- }
-
private Path getPathForLocalization(LocalResource rsrc) throws IOException,
URISyntaxException {
String user = context.getUser();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Thu Apr 11 02:08:11 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
enum ResourceState {
INIT,
DOWNLOADING,
- LOCALIZED
+ LOCALIZED,
+ FAILED
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Thu Apr 11 02:08:11 2013
@@ -29,5 +29,7 @@ public enum ResourceEventType {
/** See {@link ResourceLocalizedEvent} */
LOCALIZED,
/** See {@link ResourceReleaseEvent} */
- RELEASE
+ RELEASE,
+ /** See {@link ResourceFailedLocalizationEvent} */
+ LOCALIZATION_FAILED
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java?rev=1466756&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java Thu Apr 11 02:08:11 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+/**
+ * This event is sent by the localizer in case resource localization fails for
+ * the requested resource.
+ */
+public class ResourceFailedLocalizationEvent extends ResourceEvent {
+
+ private Throwable cause;
+
+ public ResourceFailedLocalizationEvent(LocalResourceRequest rsrc,
+ Throwable cause) {
+ super(rsrc, ResourceEventType.LOCALIZATION_FAILED);
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Thu Apr 11 02:08:11 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -41,11 +42,15 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -224,6 +229,142 @@ public class TestLocalResourcesTrackerIm
}
}
+ @Test(timeout = 1000)
+ @SuppressWarnings("unchecked")
+ public void testLocalResourceCache() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
+
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<ContainerEvent> containerEventHandler =
+ mock(EventHandler.class);
+
+ // Registering event handlers.
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ LocalResourcesTracker tracker =
+ new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+
+ LocalResourceRequest lr =
+ createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
+
+ // Creating 2 containers for same application which will be requesting
+ // same local resource.
+ // Container 1 requesting local resource.
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+ ResourceEvent reqEvent1 =
+ new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc1);
+
+ // No resource request is initially present in local cache
+ Assert.assertEquals(0, localrsrc.size());
+
+ // Container-1 requesting local resource.
+ tracker.handle(reqEvent1);
+
+ // New localized Resource should have been added to local resource map
+ // and the requesting container will be added to its waiting queue.
+ Assert.assertEquals(1, localrsrc.size());
+ Assert.assertTrue(localrsrc.containsKey(lr));
+ Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+ Assert.assertTrue(localrsrc.get(lr).ref.contains(cId1));
+ Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(lr)
+ .getState());
+
+ // Container 2 requesting the resource
+ ContainerId cId2 = BuilderUtils.newContainerId(1, 1, 1, 2);
+ LocalizerContext lc2 = new LocalizerContext(user, cId2, null);
+ ResourceEvent reqEvent2 =
+ new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
+ tracker.handle(reqEvent2);
+
+ // Container 2 should have been added to the waiting queue of the local
+ // resource
+ Assert.assertEquals(2, localrsrc.get(lr).getRefCount());
+ Assert.assertTrue(localrsrc.get(lr).ref.contains(cId2));
+
+ // Failing resource localization
+ ResourceEvent resourceFailedEvent =
+ new ResourceFailedLocalizationEvent(lr, new Exception("test"));
+
+ // Backing up the resource to track its state change as it will be
+ // removed after the failed event.
+ LocalizedResource localizedResource = localrsrc.get(lr);
+
+ tracker.handle(resourceFailedEvent);
+
+ // After receiving failed resource event; all waiting containers will be
+ // notified with Container Resource Failed Event.
+ Assert.assertEquals(0, localrsrc.size());
+ verify(containerEventHandler, times(2)).handle(
+ isA(ContainerResourceFailedEvent.class));
+ Assert.assertEquals(ResourceState.FAILED, localizedResource.getState());
+
+ // Container 1 trying to release the resource (This resource is already
+ // deleted from the cache. This call should return silently without
+ // exception.
+ ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
+ tracker.handle(relEvent1);
+
+ // Container-3 now requests for the same resource. This request call
+ // is coming prior to Container-2's release call.
+ ContainerId cId3 = BuilderUtils.newContainerId(1, 1, 1, 3);
+ LocalizerContext lc3 = new LocalizerContext(user, cId3, null);
+ ResourceEvent reqEvent3 =
+ new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
+ tracker.handle(reqEvent3);
+
+ // Local resource cache now should have the requested resource and the
+ // number of waiting containers should be 1.
+ Assert.assertEquals(1, localrsrc.size());
+ Assert.assertTrue(localrsrc.containsKey(lr));
+ Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+ Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
+
+ // Container-2 Releases the resource
+ ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
+ tracker.handle(relEvent2);
+
+ // Making sure that there is no change in the cache after the release.
+ Assert.assertEquals(1, localrsrc.size());
+ Assert.assertTrue(localrsrc.containsKey(lr));
+ Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+ Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
+
+ // Sending ResourceLocalizedEvent to tracker. In turn resource should
+ // send Container Resource Localized Event to waiting containers.
+ Path localizedPath = new Path("/tmp/file1");
+ ResourceLocalizedEvent localizedEvent =
+ new ResourceLocalizedEvent(lr, localizedPath, 123L);
+ tracker.handle(localizedEvent);
+
+ // Verifying ContainerResourceLocalizedEvent .
+ verify(containerEventHandler, times(1)).handle(
+ isA(ContainerResourceLocalizedEvent.class));
+ Assert.assertEquals(ResourceState.LOCALIZED, localrsrc.get(lr)
+ .getState());
+ Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+
+ // Container-3 releasing the resource.
+ ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
+ tracker.handle(relEvent3);
+
+ Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
+
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testHierarchicalLocalCacheDirectories() {
@@ -266,19 +407,25 @@ public class TestLocalResourcesTrackerIm
// Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
// Simulate lr1 getting localized
- ResourceLocalizedEvent rle =
+ ResourceLocalizedEvent rle1 =
new ResourceLocalizedEvent(lr1,
new Path(hierarchicalPath1.toUri().toString() +
Path.SEPARATOR + "file1"), 120);
- tracker.handle(rle);
+ tracker.handle(rle1);
// Localization successful.
- tracker.localizationCompleted(lr1, true);
LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
LocalResourceVisibility.PUBLIC);
+ // Container 1 requests lr2 to be localized.
+ ResourceEvent reqEvent2 =
+ new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent2);
+
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
// localization failed.
- tracker.localizationCompleted(lr2, false);
+ ResourceFailedLocalizationEvent rfe2 =
+ new ResourceFailedLocalizationEvent(lr2, new Exception("Test"));
+ tracker.handle(rfe2);
/*
* The path returned for two localization should be different because we
@@ -292,7 +439,11 @@ public class TestLocalResourcesTrackerIm
LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent3);
Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
- tracker.localizationCompleted(lr3, true);
+ // localization successful
+ ResourceLocalizedEvent rle3 =
+ new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
+ .toString() + Path.SEPARATOR + "file3"), 120);
+ tracker.handle(rle3);
// Verifying that path created is inside the subdirectory
Assert.assertEquals(hierarchicalPath3.toUri().toString(),