You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by rk...@apache.org on 2015/05/06 23:20:26 UTC
hadoop git commit: YARN-3491. PublicLocalizer#addResource is too
slow. (zxu via rkanter)
Repository: hadoop
Updated Branches:
refs/heads/trunk 0d3188fd2 -> b72507810
YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b7250781
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b7250781
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b7250781
Branch: refs/heads/trunk
Commit: b72507810aece08e17ab4b5aae1f7eae1fe98609
Parents: 0d3188f
Author: Robert Kanter <rk...@apache.org>
Authored: Wed May 6 14:19:06 2015 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed May 6 14:19:06 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../server/nodemanager/DirectoryCollection.java | 33 ++++++++++++-
.../nodemanager/LocalDirsHandlerService.java | 17 +++++++
.../localizer/ResourceLocalizationService.java | 51 +++++++++-----------
.../nodemanager/TestDirectoryCollection.java | 47 ++++++++++++++++++
.../TestResourceLocalizationService.java | 28 ++++++++---
6 files changed, 142 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3a8a6a3..dea4482 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -180,6 +180,8 @@ Release 2.8.0 - UNRELEASED
YARN-3396. Handle URISyntaxException in ResourceLocalizationService.
(Brahma Reddy Battula via junping_du)
+ YARN-3491. PublicLocalizer#addResource is too slow. (zxu via rkanter)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
index 2658918..32046c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
@@ -42,9 +42,12 @@ import org.apache.hadoop.util.DiskChecker;
/**
* Manages a list of local storage directories.
*/
-class DirectoryCollection {
+public class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
+ /**
+ * The enum defines disk failure type.
+ */
public enum DiskErrorCause {
DISK_FULL, OTHER
}
@@ -60,6 +63,13 @@ class DirectoryCollection {
}
/**
+ * The interface provides a callback when localDirs is changed.
+ */
+ public interface DirsChangeListener {
+ void onDirsChanged();
+ }
+
+ /**
* Returns a merged list which contains all the elements of l1 and l2
* @param l1 the first list to be included
* @param l2 the second list to be included
@@ -84,6 +94,8 @@ class DirectoryCollection {
private int goodDirsDiskUtilizationPercentage;
+ private Set<DirsChangeListener> dirsChangeListeners;
+
/**
* Create collection for the directories specified. No check for free space.
*
@@ -154,6 +166,20 @@ class DirectoryCollection {
: utilizationPercentageCutOff);
diskUtilizationSpaceCutoff =
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
+
+ dirsChangeListeners = new HashSet<DirsChangeListener>();
+ }
+
+ synchronized void registerDirsChangeListener(
+ DirsChangeListener listener) {
+ if (dirsChangeListeners.add(listener)) {
+ listener.onDirsChanged();
+ }
+ }
+
+ synchronized void deregisterDirsChangeListener(
+ DirsChangeListener listener) {
+ dirsChangeListeners.remove(listener);
}
/**
@@ -280,6 +306,11 @@ class DirectoryCollection {
}
}
setGoodDirsDiskUtilizationPercentage();
+ if (setChanged) {
+ for (DirsChangeListener listener : dirsChangeListeners) {
+ listener.onDirsChanged();
+ }
+ }
return setChanged;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index 493571d..57d4395 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
/**
@@ -192,6 +193,22 @@ public class LocalDirsHandlerService extends AbstractService {
super.serviceStop();
}
+ public void registerLocalDirsChangeListener(DirsChangeListener listener) {
+ localDirs.registerDirsChangeListener(listener);
+ }
+
+ public void registerLogDirsChangeListener(DirsChangeListener listener) {
+ logDirs.registerDirsChangeListener(listener);
+ }
+
+ public void deregisterLocalDirsChangeListener(DirsChangeListener listener) {
+ localDirs.deregisterDirsChangeListener(listener);
+ }
+
+ public void deregisterLogDirsChangeListener(DirsChangeListener listener) {
+ logDirs.deregisterDirsChangeListener(listener);
+ }
+
/**
* @return the good/valid local directories based on disks' health
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 17ea1a9..603e795 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
@@ -161,6 +162,8 @@ public class ResourceLocalizationService extends CompositeService
private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
+ private DirsChangeListener localDirsChangeListener;
+ private DirsChangeListener logDirsChangeListener;
private Context nmContext;
/**
@@ -254,6 +257,18 @@ public class ResourceLocalizationService extends CompositeService
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
+ localDirsChangeListener = new DirsChangeListener() {
+ @Override
+ public void onDirsChanged() {
+ checkAndInitializeLocalDirs();
+ }
+ };
+ logDirsChangeListener = new DirsChangeListener() {
+ @Override
+ public void onDirsChanged() {
+ initializeLogDirs(lfs);
+ }
+ };
super.serviceInit(conf);
}
@@ -345,6 +360,8 @@ public class ResourceLocalizationService extends CompositeService
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.serviceStart();
+ dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
+ dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -375,6 +392,8 @@ public class ResourceLocalizationService extends CompositeService
@Override
public void serviceStop() throws Exception {
+ dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
+ dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
if (server != null) {
server.stop();
}
@@ -814,11 +833,6 @@ public class ResourceLocalizationService extends CompositeService
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
- // In case this is not a newly initialized nm state, ensure
- // initialized local/log dirs similar to LocalizerRunner
- getInitializedLocalDirs();
- getInitializedLogDirs();
-
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@@ -1120,8 +1134,6 @@ public class ResourceLocalizationService extends CompositeService
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
- List<String> localDirs = getInitializedLocalDirs();
- List<String> logDirs = getInitializedLogDirs();
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@@ -1387,13 +1399,12 @@ public class ResourceLocalizationService extends CompositeService
}
/**
- * Synchronized method to get a list of initialized local dirs. Method will
- * check each local dir to ensure it has been setup correctly and will attempt
- * to fix any issues it finds.
- *
- * @return list of initialized local dirs
+ * Check each local dir to ensure it has been setup correctly and will
+ * attempt to fix any issues it finds.
+ * @return void
*/
- synchronized private List<String> getInitializedLocalDirs() {
+ @VisibleForTesting
+ void checkAndInitializeLocalDirs() {
List<String> dirs = dirsHandler.getLocalDirs();
List<String> checkFailedDirs = new ArrayList<String>();
for (String dir : dirs) {
@@ -1415,7 +1426,6 @@ public class ResourceLocalizationService extends CompositeService
throw new YarnRuntimeException(msg, e);
}
}
- return dirs;
}
private boolean checkLocalDir(String localDir) {
@@ -1463,17 +1473,4 @@ public class ResourceLocalizationService extends CompositeService
localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
return localDirPathFsPermissionsMap;
}
-
- /**
- * Synchronized method to get a list of initialized log dirs. Method will
- * check each local dir to ensure it has been setup correctly and will attempt
- * to fix any issues it finds.
- *
- * @return list of initialized log dirs
- */
- synchronized private List<String> getInitializedLogDirs() {
- List<String> dirs = dirsHandler.getLogDirs();
- initializeLogDirs(lfs);
- return dirs;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
index e4525a5..2fd89c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -258,4 +259,50 @@ public class TestDirectoryCollection {
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
}
+
+ @Test
+ public void testDirsChangeListener() {
+ DirsChangeListenerTest listener1 = new DirsChangeListenerTest();
+ DirsChangeListenerTest listener2 = new DirsChangeListenerTest();
+ DirsChangeListenerTest listener3 = new DirsChangeListenerTest();
+
+ String dirA = new File(testDir, "dirA").getPath();
+ String[] dirs = { dirA };
+ DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
+ Assert.assertEquals(1, dc.getGoodDirs().size());
+ Assert.assertEquals(listener1.num, 0);
+ Assert.assertEquals(listener2.num, 0);
+ Assert.assertEquals(listener3.num, 0);
+ dc.registerDirsChangeListener(listener1);
+ dc.registerDirsChangeListener(listener2);
+ dc.registerDirsChangeListener(listener3);
+ Assert.assertEquals(listener1.num, 1);
+ Assert.assertEquals(listener2.num, 1);
+ Assert.assertEquals(listener3.num, 1);
+
+ dc.deregisterDirsChangeListener(listener3);
+ dc.checkDirs();
+ Assert.assertEquals(0, dc.getGoodDirs().size());
+ Assert.assertEquals(listener1.num, 2);
+ Assert.assertEquals(listener2.num, 2);
+ Assert.assertEquals(listener3.num, 1);
+
+ dc.deregisterDirsChangeListener(listener2);
+ dc.setDiskUtilizationPercentageCutoff(100.0F);
+ dc.checkDirs();
+ Assert.assertEquals(1, dc.getGoodDirs().size());
+ Assert.assertEquals(listener1.num, 3);
+ Assert.assertEquals(listener2.num, 2);
+ Assert.assertEquals(listener3.num, 1);
+ }
+
+ static class DirsChangeListenerTest implements DirsChangeListener {
+ public int num = 0;
+ public DirsChangeListenerTest() {
+ }
+ @Override
+ public void onDirsChanged() {
+ num++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7250781/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 2edaf45..07001ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -1098,7 +1098,6 @@ public class TestResourceLocalizationService {
isA(Configuration.class));
spyService.init(conf);
- spyService.start();
final FsPermission defaultPerm = new FsPermission((short)0755);
@@ -1110,6 +1109,8 @@ public class TestResourceLocalizationService {
.mkdir(eq(publicCache),eq(defaultPerm), eq(true));
}
+ spyService.start();
+
final String user = "user0";
// init application
final Application app = mock(Application.class);
@@ -1131,21 +1132,32 @@ public class TestResourceLocalizationService {
r.setSeed(seed);
// Queue up public resource localization
- final LocalResource pubResource = getPublicMockedResource(r);
- final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+ final LocalResource pubResource1 = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq1 =
+ new LocalResourceRequest(pubResource1);
+
+ LocalResource pubResource2 = null;
+ do {
+ pubResource2 = getPublicMockedResource(r);
+ } while (pubResource2 == null || pubResource2.equals(pubResource1));
+ // above call to make sure we don't get identical resources.
+ final LocalResourceRequest pubReq2 =
+ new LocalResourceRequest(pubResource2);
+
+ Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+ pubRsrcs.add(pubReq1);
+ pubRsrcs.add(pubReq2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
- req.put(LocalResourceVisibility.PUBLIC,
- Collections.singletonList(pubReq));
-
- Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
- pubRsrcs.add(pubReq);
+ req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
dispatcher.await();
+ verify(spyService, times(1)).checkAndInitializeLocalDirs();
+
// verify directory creation
for (Path p : localDirs) {
p = new Path((new URI(p.toString())).getPath());