You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 08:07:36 UTC

[2/2] hadoop git commit: YARN-3591. Resource localization on a bad disk causes subsequent containers failure. Contributed by Lavkesh Lahngir.

YARN-3591. Resource localization on a bad disk causes subsequent containers failure. Contributed by Lavkesh Lahngir.

(cherry picked from commit 1dbd8e34a7d97c4d8586da79c980d8f2e0aad61d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70575286
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70575286
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70575286

Branch: refs/heads/branch-2
Commit: 70575286b7566c86d441d1ed075786607986de99
Parents: 8bf5362
Author: Varun Vasudev <vv...@apache.org>
Authored: Mon Sep 7 11:32:12 2015 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Mon Sep 7 11:34:14 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../localizer/LocalResourcesTrackerImpl.java    | 64 +++++++++++++--
 .../localizer/ResourceLocalizationService.java  |  2 +-
 .../TestLocalResourcesTrackerImpl.java          | 86 ++++++++++++++++++--
 .../localizer/TestResourceRetention.java        |  2 +-
 5 files changed, 138 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a021ca3..b9ad53a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -764,6 +764,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
     (Hong Zhiguo via wangda)
 
+    YARN-3591. Resource localization on a bad disk causes subsequent containers failure.
+    (Lavkesh Lahngir via vvasudev)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 7cf6b15..a1e6817 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
@@ -65,6 +67,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
   private final Dispatcher dispatcher;
   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
   private Configuration conf;
+  private LocalDirsHandlerService dirsHandler;
   /*
    * This flag controls whether this resource tracker uses hierarchical
    * directories or not. For PRIVATE and PUBLIC resource trackers it
@@ -92,27 +95,38 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
       Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
       Configuration conf, NMStateStoreService stateStore) {
     this(user, appId, dispatcher,
-      new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
-      useLocalCacheDirectoryManager, conf, stateStore);
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+        useLocalCacheDirectoryManager, conf, stateStore, null);
+  }
+
+  public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+      Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+      Configuration conf, NMStateStoreService stateStore,
+      LocalDirsHandlerService dirHandler) {
+    this(user, appId, dispatcher,
+        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+        useLocalCacheDirectoryManager, conf, stateStore, dirHandler);
   }
 
   LocalResourcesTrackerImpl(String user, ApplicationId appId,
       Dispatcher dispatcher,
-      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc,
       boolean useLocalCacheDirectoryManager, Configuration conf,
-      NMStateStoreService stateStore) {
+      NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) {
     this.appId = appId;
     this.user = user;
     this.dispatcher = dispatcher;
     this.localrsrc = localrsrc;
     this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
-    if ( this.useLocalCacheDirectoryManager) {
-      directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+    if (this.useLocalCacheDirectoryManager) {
+      directoryManagers =
+          new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
       inProgressLocalResourcesMap =
-        new ConcurrentHashMap<LocalResourceRequest, Path>();
+          new ConcurrentHashMap<LocalResourceRequest, Path>();
     }
     this.conf = conf;
     this.stateStore = stateStore;
+    this.dirsHandler = dirHandler;
   }
 
   /*
@@ -312,11 +326,45 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
         toString());
       if (!file.exists()) {
         ret = false;
+      } else if (dirsHandler != null) {
+        ret = checkLocalResource(rsrc);
       }
     }
     return ret;
   }
-  
+
+  /**
+   * Check if the rsrc is Localized on a good dir.
+   *
+   * @param rsrc
+   * @return
+   */
+  @VisibleForTesting
+  boolean checkLocalResource(LocalizedResource rsrc) {
+    List<String> localDirs = dirsHandler.getLocalDirsForRead();
+    for (String dir : localDirs) {
+      if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) {
+        return true;
+      } else {
+        continue;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param path
+   * @param parentdir
+   * @return true if parentdir is parent of path else false.
+   */
+  private boolean isParent(String path, String parentdir) {
+    // Add separator if not present.
+    if (path.charAt(path.length() - 1) != File.separatorChar) {
+      path += File.separator;
+    }
+    return path.startsWith(parentdir);
+  }
+
   @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
  // current synchronization guaranteed by crude RLS event for cleanup

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index b417c5e..e239e34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -229,7 +229,7 @@ public class ResourceLocalizationService extends CompositeService
   public void serviceInit(Configuration conf) throws Exception {
     this.validateConf(conf);
     this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
-        true, conf, stateStore);
+        true, conf, stateStore, dirsHandler);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 5695254..350cecb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -18,22 +18,22 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import static org.mockito.Mockito.any;
-import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.timeout;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
@@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -103,7 +106,7 @@ public class TestLocalResourcesTrackerImpl {
       localrsrc.put(req2, lr2);
       LocalResourcesTracker tracker =
           new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
-              false, conf, new NMNullStateStoreService());
+              false, conf, new NMNullStateStoreService(),null);
 
       ResourceEvent req11Event =
           new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -187,7 +190,7 @@ public class TestLocalResourcesTrackerImpl {
       localrsrc.put(req1, lr1);
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
           null, dispatcher, localrsrc, false, conf,
-          new NMNullStateStoreService());
+          new NMNullStateStoreService(), null);
 
       ResourceEvent req11Event = new ResourceRequestEvent(req1,
           LocalResourceVisibility.PUBLIC, lc1);
@@ -258,7 +261,7 @@ public class TestLocalResourcesTrackerImpl {
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker =
           new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
-              true, conf, new NMNullStateStoreService());
+              true, conf, new NMNullStateStoreService(), null);
 
       LocalResourceRequest lr =
           createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -405,7 +408,7 @@ public class TestLocalResourcesTrackerImpl {
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
           null, dispatcher, localrsrc, true, conf,
-          new NMNullStateStoreService());
+          new NMNullStateStoreService(), null);
 
       // This is a random path. NO File creation will take place at this place.
       Path localDir = new Path("/tmp");
@@ -782,6 +785,71 @@ public class TestLocalResourcesTrackerImpl {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testResourcePresentInGoodDir() throws IOException {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      Configuration conf = new Configuration();
+      dispatcher = createDispatcher(conf);
+      EventHandler<LocalizerEvent> localizerEventHandler =
+          mock(EventHandler.class);
+      EventHandler<LocalizerEvent> containerEventHandler =
+          mock(EventHandler.class);
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+      LocalResourceRequest req1 =
+          createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
+      LocalResourceRequest req2 =
+          createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC);
+      LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+      LocalizedResource lr2 = createLocalizedResource(req2, dispatcher);
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+          new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      localrsrc.put(req1, lr1);
+      localrsrc.put(req2, lr2);
+      LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class);
+      List<String> goodDirs = new ArrayList<String>();
+      // /tmp/somedir2 is bad
+      goodDirs.add("/tmp/somedir1/");
+      goodDirs.add("/tmp/somedir2");
+      Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs);
+      Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs);
+      LocalResourcesTrackerImpl tracker =
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              true , conf, new NMNullStateStoreService(), dirsHandler);
+      ResourceEvent req11Event =
+          new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
+      ResourceEvent req21Event =
+          new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1);
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+      // Localize R2 for C1
+      tracker.handle(req21Event);
+      dispatcher.await();
+      // Localize resource1
+      Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
+      Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
+      ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
+      tracker.handle(rle1);
+      ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
+      tracker.handle(rle2);
+      dispatcher.await();
+      // Remove somedir2 from gooddirs
+      Assert.assertTrue(tracker.checkLocalResource(lr2));
+      goodDirs.remove(1);
+      Assert.assertFalse(tracker.checkLocalResource(lr2));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   private boolean createdummylocalizefile(Path path) {
     boolean ret = false;
     File file = new File(path.toUri().getRawPath().toString());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/70575286/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
index 0e3bf86..81e69e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
@@ -83,7 +83,7 @@ public class TestResourceRetention {
     ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
       new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
     LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-      null, trackerResources, false, conf, new NMNullStateStoreService()));
+      null, trackerResources, false, conf, new NMNullStateStoreService(),null));
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,