You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by rk...@apache.org on 2015/05/06 23:20:26 UTC

hadoop git commit: YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0d3188fd2 -> b72507810


YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b7250781
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b7250781
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b7250781

Branch: refs/heads/trunk
Commit: b72507810aece08e17ab4b5aae1f7eae1fe98609
Parents: 0d3188f
Author: Robert Kanter <rk...@apache.org>
Authored: Wed May 6 14:19:06 2015 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed May 6 14:19:06 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  2 +
 .../server/nodemanager/DirectoryCollection.java | 33 ++++++++++++-
 .../nodemanager/LocalDirsHandlerService.java    | 17 +++++++
 .../localizer/ResourceLocalizationService.java  | 51 +++++++++-----------
 .../nodemanager/TestDirectoryCollection.java    | 47 ++++++++++++++++++
 .../TestResourceLocalizationService.java        | 28 ++++++++---
 6 files changed, 142 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3a8a6a3..dea4482 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -180,6 +180,8 @@ Release 2.8.0 - UNRELEASED
     YARN-3396. Handle URISyntaxException in ResourceLocalizationService. 
     (Brahma Reddy Battula via junping_du)
 
+    YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
index 2658918..32046c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
@@ -42,9 +42,12 @@ import org.apache.hadoop.util.DiskChecker;
 /**
  * Manages a list of local storage directories.
  */
-class DirectoryCollection {
+public class DirectoryCollection {
   private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
 
+  /**
+   * The enum defines disk failure type.
+   */
   public enum DiskErrorCause {
     DISK_FULL, OTHER
   }
@@ -60,6 +63,13 @@ class DirectoryCollection {
   }
 
   /**
+   * The interface provides a callback when localDirs is changed.
+   */
+  public interface DirsChangeListener {
+    void onDirsChanged();
+  }
+
+  /**
    * Returns a merged list which contains all the elements of l1 and l2
    * @param l1 the first list to be included
    * @param l2 the second list to be included
@@ -84,6 +94,8 @@ class DirectoryCollection {
 
   private int goodDirsDiskUtilizationPercentage;
 
+  private Set<DirsChangeListener> dirsChangeListeners;
+
   /**
    * Create collection for the directories specified. No check for free space.
    * 
@@ -154,6 +166,20 @@ class DirectoryCollection {
                 : utilizationPercentageCutOff);
     diskUtilizationSpaceCutoff =
         utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
+
+    dirsChangeListeners = new HashSet<DirsChangeListener>();
+  }
+
+  synchronized void registerDirsChangeListener(
+      DirsChangeListener listener) {
+    if (dirsChangeListeners.add(listener)) {
+      listener.onDirsChanged();
+    }
+  }
+
+  synchronized void deregisterDirsChangeListener(
+      DirsChangeListener listener) {
+    dirsChangeListeners.remove(listener);
   }
 
   /**
@@ -280,6 +306,11 @@ class DirectoryCollection {
       }
     }
     setGoodDirsDiskUtilizationPercentage();
+    if (setChanged) {
+      for (DirsChangeListener listener : dirsChangeListeners) {
+        listener.onDirsChanged();
+      }
+    }
     return setChanged;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index 493571d..57d4395 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 
 /**
@@ -192,6 +193,22 @@ public class LocalDirsHandlerService extends AbstractService {
     super.serviceStop();
   }
 
+  public void registerLocalDirsChangeListener(DirsChangeListener listener) {
+    localDirs.registerDirsChangeListener(listener);
+  }
+
+  public void registerLogDirsChangeListener(DirsChangeListener listener) {
+    logDirs.registerDirsChangeListener(listener);
+  }
+
+  public void deregisterLocalDirsChangeListener(DirsChangeListener listener) {
+    localDirs.deregisterDirsChangeListener(listener);
+  }
+
+  public void deregisterLogDirsChangeListener(DirsChangeListener listener) {
+    logDirs.deregisterDirsChangeListener(listener);
+  }
+
   /**
    * @return the good/valid local directories based on disks' health
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 17ea1a9..603e795 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
@@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService
   private LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
+  private DirsChangeListener localDirsChangeListener;
+  private DirsChangeListener logDirsChangeListener;
   private Context nmContext;
 
   /**
@@ -254,6 +257,18 @@ public class ResourceLocalizationService extends CompositeService
     localizerTracker = createLocalizerTracker(conf);
     addService(localizerTracker);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
+    localDirsChangeListener = new DirsChangeListener() {
+      @Override
+      public void onDirsChanged() {
+        checkAndInitializeLocalDirs();
+      }
+    };
+    logDirsChangeListener = new DirsChangeListener() {
+      @Override
+      public void onDirsChanged() {
+        initializeLogDirs(lfs);
+      }
+    };
     super.serviceInit(conf);
   }
 
@@ -345,6 +360,8 @@ public class ResourceLocalizationService extends CompositeService
                                       server.getListenerAddress());
     LOG.info("Localizer started on port " + server.getPort());
     super.serviceStart();
+    dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
+    dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
   }
 
   LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -375,6 +392,8 @@ public class ResourceLocalizationService extends CompositeService
 
   @Override
   public void serviceStop() throws Exception {
+    dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
+    dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
     if (server != null) {
       server.stop();
     }
@@ -814,11 +833,6 @@ public class ResourceLocalizationService extends CompositeService
               DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
             }
 
-            // In case this is not a newly initialized nm state, ensure
-            // initialized local/log dirs similar to LocalizerRunner
-            getInitializedLocalDirs();
-            getInitializedLogDirs();
-
             // explicitly synchronize pending here to avoid future task
             // completing and being dequeued before pending updated
             synchronized (pending) {
@@ -1120,8 +1134,6 @@ public class ResourceLocalizationService extends CompositeService
         // 1) write credentials to private dir
         writeCredentials(nmPrivateCTokensPath);
         // 2) exec initApplication and wait
-        List<String> localDirs = getInitializedLocalDirs();
-        List<String> logDirs = getInitializedLogDirs();
         if (dirsHandler.areDisksHealthy()) {
           exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
               context.getUser(),
@@ -1387,13 +1399,12 @@ public class ResourceLocalizationService extends CompositeService
   }
   
   /**
-   * Synchronized method to get a list of initialized local dirs. Method will
-   * check each local dir to ensure it has been setup correctly and will attempt
-   * to fix any issues it finds.
-   * 
-   * @return list of initialized local dirs
+   * Check each local dir to ensure it has been setup correctly and will
+   * attempt to fix any issues it finds.
+   * @return void
    */
-  synchronized private List<String> getInitializedLocalDirs() {
+  @VisibleForTesting
+  void checkAndInitializeLocalDirs() {
     List<String> dirs = dirsHandler.getLocalDirs();
     List<String> checkFailedDirs = new ArrayList<String>();
     for (String dir : dirs) {
@@ -1415,7 +1426,6 @@ public class ResourceLocalizationService extends CompositeService
         throw new YarnRuntimeException(msg, e);
       }
     }
-    return dirs;
   }
 
   private boolean checkLocalDir(String localDir) {
@@ -1463,17 +1473,4 @@ public class ResourceLocalizationService extends CompositeService
     localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
     return localDirPathFsPermissionsMap;
   }
-  
-  /**
-   * Synchronized method to get a list of initialized log dirs. Method will
-   * check each local dir to ensure it has been setup correctly and will attempt
-   * to fix any issues it finds.
-   * 
-   * @return list of initialized log dirs
-   */
-  synchronized private List<String> getInitializedLogDirs() {
-    List<String> dirs = dirsHandler.getLogDirs();
-    initializeLogDirs(lfs);
-    return dirs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
index e4525a5..2fd89c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -258,4 +259,50 @@ public class TestDirectoryCollection {
     Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
     Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
   }
+
+  @Test
+  public void testDirsChangeListener() {
+    DirsChangeListenerTest listener1 = new DirsChangeListenerTest();
+    DirsChangeListenerTest listener2 = new DirsChangeListenerTest();
+    DirsChangeListenerTest listener3 = new DirsChangeListenerTest();
+
+    String dirA = new File(testDir, "dirA").getPath();
+    String[] dirs = { dirA };
+    DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(listener1.num, 0);
+    Assert.assertEquals(listener2.num, 0);
+    Assert.assertEquals(listener3.num, 0);
+    dc.registerDirsChangeListener(listener1);
+    dc.registerDirsChangeListener(listener2);
+    dc.registerDirsChangeListener(listener3);
+    Assert.assertEquals(listener1.num, 1);
+    Assert.assertEquals(listener2.num, 1);
+    Assert.assertEquals(listener3.num, 1);
+
+    dc.deregisterDirsChangeListener(listener3);
+    dc.checkDirs();
+    Assert.assertEquals(0, dc.getGoodDirs().size());
+    Assert.assertEquals(listener1.num, 2);
+    Assert.assertEquals(listener2.num, 2);
+    Assert.assertEquals(listener3.num, 1);
+
+    dc.deregisterDirsChangeListener(listener2);
+    dc.setDiskUtilizationPercentageCutoff(100.0F);
+    dc.checkDirs();
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(listener1.num, 3);
+    Assert.assertEquals(listener2.num, 2);
+    Assert.assertEquals(listener3.num, 1);
+  }
+
+  static class DirsChangeListenerTest implements DirsChangeListener {
+    public int num = 0;
+    public DirsChangeListenerTest() {
+    }
+    @Override
+    public void onDirsChanged() {
+      num++;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 2edaf45..07001ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -1098,7 +1098,6 @@ public class TestResourceLocalizationService {
           isA(Configuration.class));
 
       spyService.init(conf);
-      spyService.start();
 
       final FsPermission defaultPerm = new FsPermission((short)0755);
 
@@ -1110,6 +1109,8 @@ public class TestResourceLocalizationService {
             .mkdir(eq(publicCache),eq(defaultPerm), eq(true));
       }
 
+      spyService.start();
+
       final String user = "user0";
       // init application
       final Application app = mock(Application.class);
@@ -1131,21 +1132,32 @@ public class TestResourceLocalizationService {
       r.setSeed(seed);
 
       // Queue up public resource localization
-      final LocalResource pubResource = getPublicMockedResource(r);
-      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+      final LocalResource pubResource1 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq1 =
+          new LocalResourceRequest(pubResource1);
+
+      LocalResource pubResource2 = null;
+      do {
+        pubResource2 = getPublicMockedResource(r);
+      } while (pubResource2 == null || pubResource2.equals(pubResource1));
+      // above call to make sure we don't get identical resources.
+      final LocalResourceRequest pubReq2 =
+          new LocalResourceRequest(pubResource2);
+
+      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+      pubRsrcs.add(pubReq1);
+      pubRsrcs.add(pubReq2);
 
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
           new HashMap<LocalResourceVisibility,
               Collection<LocalResourceRequest>>();
-      req.put(LocalResourceVisibility.PUBLIC,
-          Collections.singletonList(pubReq));
-
-      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
-      pubRsrcs.add(pubReq);
+      req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);
 
       spyService.handle(new ContainerLocalizationRequestEvent(c, req));
       dispatcher.await();
 
+      verify(spyService, times(1)).checkAndInitializeLocalDirs();
+
       // verify directory creation
       for (Path p : localDirs) {
         p = new Path((new URI(p.toString())).getPath());