You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 08:07:36 UTC
[2/2] hadoop git commit: YARN-3591. Resource localization on a bad
disk causes subsequent containers failure. Contributed by Lavkesh Lahngir.
YARN-3591. Resource localization on a bad disk causes subsequent containers failure. Contributed by Lavkesh Lahngir.
(cherry picked from commit 1dbd8e34a7d97c4d8586da79c980d8f2e0aad61d)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70575286
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70575286
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70575286
Branch: refs/heads/branch-2
Commit: 70575286b7566c86d441d1ed075786607986de99
Parents: 8bf5362
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Sep 7 11:32:12 2015 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Sep 7 11:34:14 2015 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../localizer/LocalResourcesTrackerImpl.java | 64 +++++++++++++--
.../localizer/ResourceLocalizationService.java | 2 +-
.../TestLocalResourcesTrackerImpl.java | 86 ++++++++++++++++++--
.../localizer/TestResourceRetention.java | 2 +-
5 files changed, 138 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a021ca3..b9ad53a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -764,6 +764,9 @@ Release 2.8.0 - UNRELEASED
YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
(Hong Zhiguo via wangda)
+ YARN-3591. Resource localization on a bad disk causes subsequent containers failure.
+ (Lavkesh Lahngir via vvasudev)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 7cf6b15..a1e6817 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
@@ -65,6 +67,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
+ private LocalDirsHandlerService dirsHandler;
/*
* This flag controls whether this resource tracker uses hierarchical
* directories or not. For PRIVATE and PUBLIC resource trackers it
@@ -92,27 +95,38 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
Configuration conf, NMStateStoreService stateStore) {
this(user, appId, dispatcher,
- new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
- useLocalCacheDirectoryManager, conf, stateStore);
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+ useLocalCacheDirectoryManager, conf, stateStore, null);
+ }
+
+ public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+ Configuration conf, NMStateStoreService stateStore,
+ LocalDirsHandlerService dirHandler) {
+ this(user, appId, dispatcher,
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+ useLocalCacheDirectoryManager, conf, stateStore, dirHandler);
}
LocalResourcesTrackerImpl(String user, ApplicationId appId,
Dispatcher dispatcher,
- ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc,
boolean useLocalCacheDirectoryManager, Configuration conf,
- NMStateStoreService stateStore) {
+ NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) {
this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
- if ( this.useLocalCacheDirectoryManager) {
- directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+ if (this.useLocalCacheDirectoryManager) {
+ directoryManagers =
+ new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
inProgressLocalResourcesMap =
- new ConcurrentHashMap<LocalResourceRequest, Path>();
+ new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
this.stateStore = stateStore;
+ this.dirsHandler = dirHandler;
}
/*
@@ -312,11 +326,45 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
toString());
if (!file.exists()) {
ret = false;
+ } else if (dirsHandler != null) {
+ ret = checkLocalResource(rsrc);
}
}
return ret;
}
-
+
+ /**
+ * Check if the rsrc is Localized on a good dir.
+ *
+ * @param rsrc
+ * @return
+ */
+ @VisibleForTesting
+ boolean checkLocalResource(LocalizedResource rsrc) {
+ List<String> localDirs = dirsHandler.getLocalDirsForRead();
+ for (String dir : localDirs) {
+ if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) {
+ return true;
+ } else {
+ continue;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param path
+ * @param parentdir
+ * @return true if parentdir is parent of path else false.
+ */
+ private boolean isParent(String path, String parentdir) {
+ // Add separator if not present.
+ if (path.charAt(path.length() - 1) != File.separatorChar) {
+ path += File.separator;
+ }
+ return path.startsWith(parentdir);
+ }
+
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index b417c5e..e239e34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -229,7 +229,7 @@ public class ResourceLocalizationService extends CompositeService
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
- true, conf, stateStore);
+ true, conf, stateStore, dirsHandler);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 5695254..350cecb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -18,22 +18,22 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-import static org.mockito.Mockito.any;
-import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.timeout;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
@@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
public class TestLocalResourcesTrackerImpl {
@@ -103,7 +106,7 @@ public class TestLocalResourcesTrackerImpl {
localrsrc.put(req2, lr2);
LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
- false, conf, new NMNullStateStoreService());
+ false, conf, new NMNullStateStoreService(),null);
ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -187,7 +190,7 @@ public class TestLocalResourcesTrackerImpl {
localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
null, dispatcher, localrsrc, false, conf,
- new NMNullStateStoreService());
+ new NMNullStateStoreService(), null);
ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1);
@@ -258,7 +261,7 @@ public class TestLocalResourcesTrackerImpl {
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker =
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
- true, conf, new NMNullStateStoreService());
+ true, conf, new NMNullStateStoreService(), null);
LocalResourceRequest lr =
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -405,7 +408,7 @@ public class TestLocalResourcesTrackerImpl {
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
null, dispatcher, localrsrc, true, conf,
- new NMNullStateStoreService());
+ new NMNullStateStoreService(), null);
// This is a random path. NO File creation will take place at this place.
Path localDir = new Path("/tmp");
@@ -782,6 +785,71 @@ public class TestLocalResourcesTrackerImpl {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResourcePresentInGoodDir() throws IOException {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+ LocalResourceRequest req1 =
+ createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
+ LocalResourceRequest req2 =
+ createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC);
+ LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+ LocalizedResource lr2 = createLocalizedResource(req2, dispatcher);
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ localrsrc.put(req1, lr1);
+ localrsrc.put(req2, lr2);
+ LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class);
+ List<String> goodDirs = new ArrayList<String>();
+ // /tmp/somedir2 is bad
+ goodDirs.add("/tmp/somedir1/");
+ goodDirs.add("/tmp/somedir2");
+ Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs);
+ Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs);
+ LocalResourcesTrackerImpl tracker =
+ new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+ true , conf, new NMNullStateStoreService(), dirsHandler);
+ ResourceEvent req11Event =
+ new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
+ ResourceEvent req21Event =
+ new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1);
+ // Localize R1 for C1
+ tracker.handle(req11Event);
+ // Localize R2 for C1
+ tracker.handle(req21Event);
+ dispatcher.await();
+ // Localize resource1
+ Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
+ Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
+ ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
+ tracker.handle(rle1);
+ ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
+ tracker.handle(rle2);
+ dispatcher.await();
+ // Remove somedir2 from gooddirs
+ Assert.assertTrue(tracker.checkLocalResource(lr2));
+ goodDirs.remove(1);
+ Assert.assertFalse(tracker.checkLocalResource(lr2));
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
index 0e3bf86..81e69e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
@@ -83,7 +83,7 @@ public class TestResourceRetention {
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
- null, trackerResources, false, conf, new NMNullStateStoreService()));
+ null, trackerResources, false, conf, new NMNullStateStoreService(),null));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,