You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/11/30 00:18:10 UTC
svn commit: r1208131 [1/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/ja...
Author: mahadev
Date: Tue Nov 29 23:17:54 2011
New Revision: 1208131
URL: http://svn.apache.org/viewvc?rev=1208131&view=rev
Log:
MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestDiskFailures.java
Removed:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/conf/container-executor.cfg
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Nov 29 23:17:54 2011
@@ -71,6 +71,8 @@ Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
+
+ MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev)
IMPROVEMENTS
Modified: hadoop/common/trunk/hadoop-mapreduce-project/conf/container-executor.cfg
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/conf/container-executor.cfg?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/conf/container-executor.cfg (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/conf/container-executor.cfg Tue Nov 29 23:17:54 2011
@@ -1,3 +1,3 @@
-yarn.nodemanager.local-dirs=#configured value of yarn.nodemanager.local-dirs. It can be a list of comma separated paths.
-yarn.nodemanager.log-dirs=#configured value of yarn.nodemanager.log-dirs.
yarn.nodemanager.linux-container-executor.group=#configured value of yarn.nodemanager.linux-container-executor.group
+banned.users=#comma separated list of users who can not run applications
+min.user.id=1000#Prevent other super-users
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Tue Nov 29 23:17:54 2011
@@ -113,9 +113,10 @@ class LocalDistributedCacheManager {
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
ExecutorService exec = Executors.newCachedThreadPool();
+ Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
- localDirAllocator, resource, new Random());
+ destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Tue Nov 29 23:17:54 2011
@@ -56,7 +56,7 @@ public class MiniMRYarnCluster extends M
}
public MiniMRYarnCluster(String testName, int noOfNMs) {
- super(testName, noOfNMs);
+ super(testName, noOfNMs, 4, 4);
//TODO: add the history server
historyServerWrapper = new JobHistoryServerWrapper();
addService(historyServerWrapper);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Tue Nov 29 23:17:54 2011
@@ -43,7 +43,8 @@ public class TestDistributedShell {
public static void setup() throws InterruptedException, IOException {
LOG.info("Starting up YARN cluster");
if (yarnCluster == null) {
- yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName());
+ yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
+ 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Nov 29 23:17:54 2011
@@ -351,13 +351,39 @@ public class YarnConfiguration extends C
/** Class that calculates containers current resource utilization.*/
public static final String NM_CONTAINER_MON_RESOURCE_CALCULATOR =
NM_PREFIX + "container-monitor.resource-calculator.class";
-
+
+ /**
+ * Enable/Disable disks' health checker. Default is true.
+ * An expert level configuration property.
+ */
+ public static final String NM_DISK_HEALTH_CHECK_ENABLE =
+ NM_PREFIX + "disk-health-checker.enable";
+ /** Frequency of running disks' health checker.*/
+ public static final String NM_DISK_HEALTH_CHECK_INTERVAL_MS =
+ NM_PREFIX + "disk-health-checker.interval-ms";
+ /** By default, disks' health is checked every 2 minutes. */
+ public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS =
+ 2 * 60 * 1000;
+
+ /**
+ * The minimum fraction of number of disks to be healthy for the nodemanager
+ * to launch new containers. This applies to nm-local-dirs and nm-log-dirs.
+ */
+ public static final String NM_MIN_HEALTHY_DISKS_FRACTION =
+ NM_PREFIX + "disk-health-checker.min-healthy-disks";
+ /**
+ * By default, at least 5% of disks are to be healthy to say that the node
+ * is healthy in terms of disks.
+ */
+ public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION
+ = 0.25F;
+
/** Frequency of running node health script.*/
public static final String NM_HEALTH_CHECK_INTERVAL_MS =
NM_PREFIX + "health-checker.interval-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;
-
- /** Script time out period.*/
+
+ /** Health check script time out period.*/
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
NM_PREFIX + "health-checker.script.timeout-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java Tue Nov 29 23:17:54 2011
@@ -31,6 +31,7 @@ import java.io.Writer;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -105,12 +106,12 @@ public class AggregatedLogFormat {
public static class LogValue {
- private final String[] rootLogDirs;
+ private final List<String> rootLogDirs;
private final ContainerId containerId;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
- public LogValue(String[] rootLogDirs, ContainerId containerId) {
+ public LogValue(List<String> rootLogDirs, ContainerId containerId) {
this.rootLogDirs = rootLogDirs;
this.containerId = containerId;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Tue Nov 29 23:17:54 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -56,7 +55,10 @@ public class FSDownload implements Calla
private final UserGroupInformation userUgi;
private Configuration conf;
private LocalResource resource;
- private LocalDirAllocator dirs;
+
+ /** The local FS dir path under which this resource is to be localized to */
+ private Path destDirPath;
+
private static final FsPermission cachePerms = new FsPermission(
(short) 0755);
static final FsPermission PUBLIC_FILE_PERMS = new FsPermission((short) 0555);
@@ -65,10 +67,11 @@ public class FSDownload implements Calla
static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755);
static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700);
+
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
- LocalDirAllocator dirs, LocalResource resource, Random rand) {
+ Path destDirPath, LocalResource resource, Random rand) {
this.conf = conf;
- this.dirs = dirs;
+ this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
@@ -136,15 +139,13 @@ public class FSDownload implements Calla
}
Path tmp;
- Path dst =
- dirs.getLocalPathForWrite(".", getEstimatedSize(resource),
- conf);
do {
- tmp = new Path(dst, String.valueOf(rand.nextLong()));
+ tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
} while (files.util().exists(tmp));
- dst = tmp;
- files.mkdir(dst, cachePerms, false);
- final Path dst_work = new Path(dst + "_tmp");
+ destDirPath = tmp;
+
+ files.mkdir(destDirPath, cachePerms, false);
+ final Path dst_work = new Path(destDirPath + "_tmp");
files.mkdir(dst_work, cachePerms, false);
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
@@ -158,9 +159,9 @@ public class FSDownload implements Calla
});
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
changePermissions(dFinal.getFileSystem(conf), dFinal);
- files.rename(dst_work, dst, Rename.OVERWRITE);
+ files.rename(dst_work, destDirPath, Rename.OVERWRITE);
} catch (Exception e) {
- try { files.delete(dst, true); } catch (IOException ignore) { }
+ try { files.delete(destDirPath, true); } catch (IOException ignore) { }
throw e;
} finally {
try {
@@ -170,9 +171,8 @@ public class FSDownload implements Calla
rand = null;
conf = null;
resource = null;
- dirs = null;
}
- return files.makeQualified(new Path(dst, sCopy.getName()));
+ return files.makeQualified(new Path(destDirPath, sCopy.getName()));
}
/**
@@ -221,17 +221,4 @@ public class FSDownload implements Calla
}
}
- private static long getEstimatedSize(LocalResource rsrc) {
- if (rsrc.getSize() < 0) {
- return -1;
- }
- switch (rsrc.getType()) {
- case ARCHIVE:
- return 5 * rsrc.getSize();
- case FILE:
- default:
- return rsrc.getSize();
- }
- }
-
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Tue Nov 29 23:17:54 2011
@@ -146,13 +146,14 @@ public class TestFSDownload {
vis = LocalResourceVisibility.APPLICATION;
break;
}
-
- LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
- sizes[i], rand, vis);
+ Path p = new Path(basedir, "" + i);
+ LocalResource rsrc = createFile(files, p, sizes[i], rand, vis);
rsrcVis.put(rsrc, vis);
+ Path destPath = dirs.getLocalPathForWrite(
+ basedir.toString(), sizes[i], conf);
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- dirs, rsrc, new Random(sharedSeed));
+ destPath, rsrc, new Random(sharedSeed));
pending.put(rsrc, exec.submit(fsd));
}
@@ -249,13 +250,15 @@ public class TestFSDownload {
vis = LocalResourceVisibility.APPLICATION;
break;
}
-
- LocalResource rsrc = createJar(files, new Path(basedir, "dir" + i
- + ".jar"), vis);
+
+ Path p = new Path(basedir, "dir" + i + ".jar");
+ LocalResource rsrc = createJar(files, p, vis);
rsrcVis.put(rsrc, vis);
+ Path destPath = dirs.getLocalPathForWrite(
+ basedir.toString(), conf);
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
- dirs, rsrc, new Random(sharedSeed));
+ destPath, rsrc, new Random(sharedSeed));
pending.put(rsrc, exec.submit(fsd));
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Tue Nov 29 23:17:54 2011
@@ -389,6 +389,22 @@
</property>
<property>
+ <description>Frequency of running disk health checker code.</description>
+ <name>yarn.nodemanager.disk-health-checker.interval-ms</name>
+ <value>120000</value>
+ </property>
+
+ <property>
+ <description>The minimum fraction of number of disks to be healthy for the
+ nodemanager to launch new containers. This correspond to both
+ yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs. i.e. If there
+ are less number of healthy local-dirs (or log-dirs) available, then
+ new containers will not be launched on this node.</description>
+ <name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
+ <value>0.25</value>
+ </property>
+
+ <property>
<description>The path to the Linux container executor.</description>
<name>yarn.nodemanager.linux-container-executor.path</name>
</property>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Tue Nov 29 23:17:54 2011
@@ -45,6 +45,7 @@ public abstract class ContainerExecutor
FsPermission.createImmutable((short) 0700);
private Configuration conf;
+
private ConcurrentMap<ContainerId, Path> pidFiles =
new ConcurrentHashMap<ContainerId, Path>();
@@ -68,7 +69,7 @@ public abstract class ContainerExecutor
* @throws IOException
*/
public abstract void init() throws IOException;
-
+
/**
* Prepare the environment for containers in this application to execute.
* For $x in local.dirs
@@ -82,12 +83,14 @@ public abstract class ContainerExecutor
* @param appId id of the application
* @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
+ * @param localDirs nm-local-dirs
+ * @param logDirs nm-log-dirs
* @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM
*/
public abstract void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
- List<Path> localDirs)
+ List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException;
@@ -100,12 +103,15 @@ public abstract class ContainerExecutor
* @param user the user of the container
* @param appId the appId of the container
* @param containerWorkDir the work dir for the container
+ * @param localDirs nm-local-dirs to be used for this container
+ * @param logDirs nm-log-dirs to be used for this container
* @return the return status of the launch
* @throws IOException
*/
public abstract int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
- String user, String appId, Path containerWorkDir) throws IOException;
+ String user, String appId, Path containerWorkDir, List<String> localDirs,
+ List<String> logDirs) throws IOException;
public abstract boolean signalContainer(String user, String pid,
Signal signal)
@@ -116,7 +122,8 @@ public abstract class ContainerExecutor
public enum ExitCode {
FORCE_KILLED(137),
- TERMINATED(143);
+ TERMINATED(143),
+ DISKS_FAILED(-101);
private final int code;
private ExitCode(int exitCode) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue Nov 29 23:17:54 2011
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -39,7 +40,6 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -77,16 +77,17 @@ public class DefaultContainerExecutor ex
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
- List<Path> localDirs) throws IOException, InterruptedException {
+ List<String> localDirs, List<String> logDirs)
+ throws IOException, InterruptedException {
ContainerLocalizer localizer =
- new ContainerLocalizer(this.lfs, user, appId, locId,
- localDirs, RecordFactoryProvider.getRecordFactory(getConf()));
+ new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
+ RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
- createAppLogDirs(appId);
+ createAppLogDirs(appId, logDirs);
// TODO: Why pick first app dir. The same in LCE why not random?
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
@@ -104,8 +105,8 @@ public class DefaultContainerExecutor ex
@Override
public int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
- String userName, String appId, Path containerWorkDir)
- throws IOException {
+ String userName, String appId, Path containerWorkDir,
+ List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID();
@@ -115,10 +116,7 @@ public class DefaultContainerExecutor ex
ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId().
getApplicationId());
- String[] sLocalDirs = getConf().getStrings(
- YarnConfiguration.NM_LOCAL_DIRS,
- YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
- for (String sLocalDir : sLocalDirs) {
+ for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
@@ -128,7 +126,7 @@ public class DefaultContainerExecutor ex
}
// Create the container log-dirs on all disks
- createContainerLogDirs(appIdStr, containerIdStr);
+ createContainerLogDirs(appIdStr, containerIdStr, logDirs);
// copy launch script to work dir
Path launchDst =
@@ -299,9 +297,9 @@ public class DefaultContainerExecutor ex
* $logdir/$user/$appId */
private static final short LOGDIR_PERM = (short)0710;
- private Path getFirstApplicationDir(List<Path> localDirs, String user,
+ private Path getFirstApplicationDir(List<String> localDirs, String user,
String appId) {
- return getApplicationDir(localDirs.get(0), user, appId);
+ return getApplicationDir(new Path(localDirs.get(0)), user, appId);
}
private Path getApplicationDir(Path base, String user, String appId) {
@@ -328,14 +326,14 @@ public class DefaultContainerExecutor ex
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
- private void createUserLocalDirs(List<Path> localDirs, String user)
+ private void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
- for (Path localDir : localDirs) {
+ for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
- lfs.mkdir(getUserCacheDir(localDir, user), userperms, true);
+ lfs.mkdir(getUserCacheDir(new Path(localDir), user), userperms, true);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
@@ -357,7 +355,7 @@ public class DefaultContainerExecutor ex
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
- private void createUserCacheDirs(List<Path> localDirs, String user)
+ private void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
@@ -366,9 +364,10 @@ public class DefaultContainerExecutor ex
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
- for (Path localDir : localDirs) {
+ for (String localDir : localDirs) {
// create $local.dir/usercache/$user/appcache
- final Path appDir = getAppcacheDir(localDir, user);
+ Path localDirPath = new Path(localDir);
+ final Path appDir = getAppcacheDir(localDirPath, user);
try {
lfs.mkdir(appDir, appCachePerms, true);
appcacheDirStatus = true;
@@ -376,7 +375,7 @@ public class DefaultContainerExecutor ex
LOG.warn("Unable to create app cache directory : " + appDir, e);
}
// create $local.dir/usercache/$user/filecache
- final Path distDir = getFileCacheDir(localDir, user);
+ final Path distDir = getFileCacheDir(localDirPath, user);
try {
lfs.mkdir(distDir, fileperms, true);
distributedCacheDirStatus = true;
@@ -403,12 +402,12 @@ public class DefaultContainerExecutor ex
* </ul>
* @param localDirs
*/
- private void createAppDirs(List<Path> localDirs, String user, String appId)
+ private void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
- for (Path localDir : localDirs) {
- Path fullAppDir = getApplicationDir(localDir, user, appId);
+ for (String localDir : localDirs) {
+ Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);
@@ -427,15 +426,12 @@ public class DefaultContainerExecutor ex
/**
* Create application log directories on all disks.
*/
- private void createAppLogDirs(String appId)
+ private void createAppLogDirs(String appId, List<String> logDirs)
throws IOException {
- String[] rootLogDirs =
- getConf()
- .getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
-
+
boolean appLogDirStatus = false;
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
- for (String rootLogDir : rootLogDirs) {
+ for (String rootLogDir : logDirs) {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
@@ -455,15 +451,12 @@ public class DefaultContainerExecutor ex
/**
* Create application log directories on all disks.
*/
- private void createContainerLogDirs(String appId, String containerId)
- throws IOException {
- String[] rootLogDirs =
- getConf()
- .getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
-
+ private void createContainerLogDirs(String appId, String containerId,
+ List<String> logDirs) throws IOException {
+
boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
- for (String rootLogDir : rootLogDirs) {
+ for (String rootLogDir : logDirs) {
// create $log.dir/$appid/$containerid
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
@@ -483,4 +476,15 @@ public class DefaultContainerExecutor ex
+ containerId);
}
}
+
+ /**
+ * @return the list of paths of given local directories
+ */
+ private static List<Path> getPaths(List<String> dirs) {
+ List<Path> paths = new ArrayList<Path>(dirs.size());
+ for (int i = 0; i < dirs.size(); i++) {
+ paths.add(new Path(dirs.get(i)));
+ }
+ return paths;
+ }
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java?rev=1208131&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java Tue Nov 29 23:17:54 2011
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * Manages a list of local storage directories.
+ */
+class DirectoryCollection {
+ private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
+
+ // Good local storage directories
+ private List<String> localDirs;
+ private List<String> failedDirs;
+ private int numFailures;
+
+ public DirectoryCollection(String[] dirs) {
+ localDirs = new ArrayList<String>();
+ localDirs.addAll(Arrays.asList(dirs));
+ failedDirs = new ArrayList<String>();
+ }
+
+ /**
+ * @return the current valid directories
+ */
+ synchronized List<String> getGoodDirs() {
+ return localDirs;
+ }
+
+ /**
+ * @return the failed directories
+ */
+ synchronized List<String> getFailedDirs() {
+ return failedDirs;
+ }
+
+ /**
+ * @return total the number of directory failures seen till now
+ */
+ synchronized int getNumFailures() {
+ return numFailures;
+ }
+
+ /**
+ * Check the health of current set of local directories, updating the list
+ * of valid directories if necessary.
+ * @return <em>true</em> if there is a new disk-failure identified in
+ * this checking. <em>false</em> otherwise.
+ */
+ synchronized boolean checkDirs() {
+ int oldNumFailures = numFailures;
+ ListIterator<String> it = localDirs.listIterator();
+ while (it.hasNext()) {
+ final String dir = it.next();
+ try {
+ DiskChecker.checkDir(new File(dir));
+ } catch (DiskErrorException de) {
+ LOG.warn("Directory " + dir + " error " +
+ de.getMessage() + ", removing from the list of valid directories.");
+ it.remove();
+ failedDirs.add(dir);
+ numFailures++;
+ }
+ }
+ if (numFailures > oldNumFailures) {
+ return true;
+ }
+ return false;
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Nov 29 23:17:54 2011
@@ -126,13 +126,18 @@ public class LinuxContainerExecutor exte
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
- List<Path> localDirs) throws IOException, InterruptedException {
+ List<String> localDirs, List<String> logDirs)
+ throws IOException, InterruptedException {
+
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
user,
Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
appId,
- nmPrivateContainerTokensPath.toUri().getPath().toString()));
+ nmPrivateContainerTokensPath.toUri().getPath().toString(),
+ StringUtils.join(",", localDirs),
+ StringUtils.join(",", logDirs)));
+
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
command.add(jvm.toString());
@@ -148,8 +153,8 @@ public class LinuxContainerExecutor exte
command.add(locId);
command.add(nmAddr.getHostName());
command.add(Integer.toString(nmAddr.getPort()));
- for (Path p : localDirs) {
- command.add(p.toUri().getPath().toString());
+ for (String dir : localDirs) {
+ command.add(dir);
}
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
@@ -174,7 +179,8 @@ public class LinuxContainerExecutor exte
@Override
public int launchContainer(Container container,
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
- String user, String appId, Path containerWorkDir) throws IOException {
+ String user, String appId, Path containerWorkDir,
+ List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
@@ -189,8 +195,10 @@ public class LinuxContainerExecutor exte
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
- nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath
- .toString()));
+ nmPrivateTokensPath.toUri().getPath().toString(),
+ pidFilePath.toString(),
+ StringUtils.join(",", localDirs),
+ StringUtils.join(",", logDirs)));
String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1208131&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Tue Nov 29 23:17:54 2011
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * The class which provides functionality of checking the health of the local
+ * directories of a node. This specifically manages nodemanager-local-dirs and
+ * nodemanager-log-dirs by periodically checking their health.
+ */
+public class LocalDirsHandlerService extends AbstractService {
+
+ private static Log LOG = LogFactory.getLog(LocalDirsHandlerService.class);
+
+ /** Timer used to schedule disk health monitoring code execution */
+ private Timer dirsHandlerScheduler;
+ private long diskHealthCheckInterval;
+ private boolean isDiskHealthCheckerEnabled;
+ /**
+ * Minimum fraction of disks to be healthy for the node to be healthy in
+ * terms of disks. This applies to nm-local-dirs and nm-log-dirs.
+ */
+ private float minNeededHealthyDisksFactor;
+
+ private MonitoringTimerTask monitoringTimerTask;
+
+ /** Local dirs to store localized files in */
+ private DirectoryCollection localDirs = null;
+
+ /** storage for container logs*/
+ private DirectoryCollection logDirs = null;
+
+ /**
+ * Everybody should go through this LocalDirAllocator object for read/write
+ * of any local path corresponding to {@link YarnConfiguration#NM_LOCAL_DIRS}
+ * instead of creating his/her own LocalDirAllocator objects
+ */
+ private LocalDirAllocator localDirsAllocator;
+ /**
+ * Everybody should go through this LocalDirAllocator object for read/write
+ * of any local path corresponding to {@link YarnConfiguration#NM_LOG_DIRS}
+ * instead of creating his/her own LocalDirAllocator objects
+ */
+ private LocalDirAllocator logDirsAllocator;
+
+ /** when disk health checking code was last run */
+ private long lastDisksCheckTime;
+
+ /**
+ * Class which is used by the {@link Timer} class to periodically execute the
+ * disks' health checker code.
+ */
+ private final class MonitoringTimerTask extends TimerTask {
+
+ public MonitoringTimerTask(Configuration conf) {
+ localDirs = new DirectoryCollection(
+ conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS));
+ logDirs = new DirectoryCollection(
+ conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS));
+ localDirsAllocator =
+ new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
+ logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
+ }
+
+ @Override
+ public void run() {
+ boolean newFailure = false;
+ if (localDirs.checkDirs()) {
+ newFailure = true;
+ }
+ if (logDirs.checkDirs()) {
+ newFailure = true;
+ }
+
+ if (newFailure) {
+ LOG.info("Disk(s) failed. " + getDisksHealthReport());
+ updateDirsInConfiguration();
+ if (!areDisksHealthy()) {
+ // Just log.
+ LOG.error("Most of the disks failed. " + getDisksHealthReport());
+ }
+ }
+ lastDisksCheckTime = System.currentTimeMillis();
+ }
+ }
+
+ public LocalDirsHandlerService() {
+ super(LocalDirsHandlerService.class.getName());
+ }
+
+ /**
+ * Method which initializes the timertask and its interval time.
+ */
+ @Override
+ public void init(Configuration config) {
+ // Clone the configuration as we may do modifications to dirs-list
+ Configuration conf = new Configuration(config);
+ diskHealthCheckInterval = conf.getLong(
+ YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS);
+ monitoringTimerTask = new MonitoringTimerTask(conf);
+ isDiskHealthCheckerEnabled = conf.getBoolean(
+ YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, true);
+ minNeededHealthyDisksFactor = conf.getFloat(
+ YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION,
+ YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
+ lastDisksCheckTime = System.currentTimeMillis();
+ super.init(conf);
+ }
+
+ /**
+ * Method used to start the disk health monitoring, if enabled.
+ */
+ @Override
+ public void start() {
+ if (isDiskHealthCheckerEnabled) {
+ dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
+ // Start the timer task for disk health checking immediately and
+ // then run periodically at interval time.
+ dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask, 0,
+ diskHealthCheckInterval);
+ }
+ super.start();
+ }
+
+ /**
+ * Method used to terminate the disk health monitoring service.
+ */
+ @Override
+ public void stop() {
+ if (dirsHandlerScheduler != null) {
+ dirsHandlerScheduler.cancel();
+ }
+ super.stop();
+ }
+
+ /**
+ * @return the good/valid local directories based on disks' health
+ */
+ public List<String> getLocalDirs() {
+ return localDirs.getGoodDirs();
+ }
+
+ /**
+ * @return the good/valid log directories based on disks' health
+ */
+ public List<String> getLogDirs() {
+ return logDirs.getGoodDirs();
+ }
+
+ /**
+ * @return the health report of nm-local-dirs and nm-log-dirs
+ */
+ public String getDisksHealthReport() {
+ if (!isDiskHealthCheckerEnabled) {
+ return "";
+ }
+
+ StringBuilder report = new StringBuilder();
+ List<String> failedLocalDirsList = localDirs.getFailedDirs();
+ List<String> failedLogDirsList = logDirs.getFailedDirs();
+ int numLocalDirs = localDirs.getGoodDirs().size()
+ + failedLocalDirsList.size();
+ int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
+ if (!failedLocalDirsList.isEmpty()) {
+ report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+ + " local-dirs turned bad: "
+ + StringUtils.join(",", failedLocalDirsList) + ";");
+ }
+ if (!failedLogDirsList.isEmpty()) {
+ report.append(failedLogDirsList.size() + "/" + numLogDirs
+ + " log-dirs turned bad: "
+ + StringUtils.join(",", failedLogDirsList));
+ }
+ return report.toString();
+ }
+
+ /**
+ * The minimum fraction of number of disks needed to be healthy for a node to
+ * be considered healthy in terms of disks is configured using
+ * {@link YarnConfiguration#NM_MIN_HEALTHY_DISKS_FRACTION}, with a default
+ * value of {@link YarnConfiguration#DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION}.
+ * @return <em>false</em> if either (a) more than the allowed percentage of
+ * nm-local-dirs failed or (b) more than the allowed percentage of
+ * nm-log-dirs failed.
+ */
+ public boolean areDisksHealthy() {
+ if (!isDiskHealthCheckerEnabled) {
+ return true;
+ }
+
+ int goodDirs = getLocalDirs().size();
+ int failedDirs = localDirs.getFailedDirs().size();
+ int totalConfiguredDirs = goodDirs + failedDirs;
+ if (goodDirs/(float)totalConfiguredDirs < minNeededHealthyDisksFactor) {
+ return false; // Not enough healthy local-dirs
+ }
+
+ goodDirs = getLogDirs().size();
+ failedDirs = logDirs.getFailedDirs().size();
+ totalConfiguredDirs = goodDirs + failedDirs;
+ if (goodDirs/(float)totalConfiguredDirs < minNeededHealthyDisksFactor) {
+ return false; // Not enough healthy log-dirs
+ }
+
+ return true;
+ }
+
+ public long getLastDisksCheckTime() {
+ return lastDisksCheckTime;
+ }
+
+ /**
+ * Set good local dirs and good log dirs in the configuration so that the
+ * LocalDirAllocator objects will use this updated configuration only.
+ */
+ private void updateDirsInConfiguration() {
+ Configuration conf = getConfig();
+ List<String> localDirs = getLocalDirs();
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
+ localDirs.toArray(new String[localDirs.size()]));
+ List<String> logDirs = getLogDirs();
+ synchronized(conf) {
+ conf.setStrings(YarnConfiguration.NM_LOG_DIRS,
+ logDirs.toArray(new String[logDirs.size()]));
+ }
+ }
+
+ public Path getLocalPathForWrite(String pathStr) throws IOException {
+ Configuration conf = getConfig();
+ Path path = null;
+ synchronized (conf) {
+ path = localDirsAllocator.getLocalPathForWrite(pathStr, conf);
+ }
+ return path;
+ }
+
+ public Path getLocalPathForWrite(String pathStr, long size,
+ boolean checkWrite) throws IOException {
+ Configuration conf = getConfig();
+ Path path = null;
+ synchronized (conf) {
+ path = localDirsAllocator.getLocalPathForWrite(pathStr, size, conf,
+ checkWrite);
+ }
+ return path;
+ }
+
+ public Path getLogPathForWrite(String pathStr, boolean checkWrite)
+ throws IOException {
+ Configuration conf = getConfig();
+ Path path = null;
+ synchronized (conf) {
+ path = logDirsAllocator.getLocalPathForWrite(pathStr,
+ LocalDirAllocator.SIZE_UNKNOWN, conf, checkWrite);
+ }
+ return path;
+ }
+
+ public Path getLogPathToRead(String pathStr) throws IOException {
+ Configuration conf = getConfig();
+ Path path = null;
+ synchronized (conf) {
+ path = logDirsAllocator.getLocalPathToRead(pathStr, conf);
+ }
+ return path;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java?rev=1208131&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java Tue Nov 29 23:17:54 2011
@@ -0,0 +1,97 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.CompositeService;
+
+/**
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+public class NodeHealthCheckerService extends CompositeService {
+
+ private NodeHealthScriptRunner nodeHealthScriptRunner;
+ private LocalDirsHandlerService dirsHandler;
+
+ static final String SEPARATOR = ";";
+
+ public NodeHealthCheckerService() {
+ super(NodeHealthCheckerService.class.getName());
+ dirsHandler = new LocalDirsHandlerService();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ if (NodeHealthScriptRunner.shouldRun(conf)) {
+ nodeHealthScriptRunner = new NodeHealthScriptRunner();
+ addService(nodeHealthScriptRunner);
+ }
+ addService(dirsHandler);
+ super.init(conf);
+ }
+
+ /**
+ * @return the reporting string of health of the node
+ */
+ String getHealthReport() {
+ String scriptReport = (nodeHealthScriptRunner == null) ? ""
+ : nodeHealthScriptRunner.getHealthReport();
+ if (scriptReport.equals("")) {
+ return dirsHandler.getDisksHealthReport();
+ } else {
+ return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
+ }
+ }
+
+ /**
+ * @return <em>true</em> if the node is healthy
+ */
+ boolean isHealthy() {
+ boolean scriptHealthStatus = (nodeHealthScriptRunner == null) ? true
+ : nodeHealthScriptRunner.isHealthy();
+ return scriptHealthStatus && dirsHandler.areDisksHealthy();
+ }
+
+ /**
+ * @return when the last time the node health status is reported
+ */
+ long getLastHealthReportTime() {
+ long diskCheckTime = dirsHandler.getLastDisksCheckTime();
+ long lastReportTime = (nodeHealthScriptRunner == null)
+ ? diskCheckTime
+ : Math.max(nodeHealthScriptRunner.getLastReportedTime(), diskCheckTime);
+ return lastReportTime;
+ }
+
+ /**
+ * @return the disk handler
+ */
+ public LocalDirsHandlerService getDiskHandler() {
+ return dirsHandler;
+ }
+
+ /**
+ * @return the node health script runner
+ */
+ NodeHealthScriptRunner getNodeHealthScriptRunner() {
+ return nodeHealthScriptRunner;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java?rev=1208131&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java Tue Nov 29 23:17:54 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ *
+ * The class which provides functionality of checking the health of the node
+ * using the configured node health script and reporting back to the service
+ * for which the health checker has been asked to report.
+ */
+public class NodeHealthScriptRunner extends AbstractService {
+
+ private static Log LOG = LogFactory.getLog(NodeHealthScriptRunner.class);
+
+ /** Absolute path to the health script. */
+ private String nodeHealthScript;
+ /** Delay after which node health script to be executed */
+ private long intervalTime;
+ /** Time after which the script should be timedout */
+ private long scriptTimeout;
+ /** Timer used to schedule node health monitoring script execution */
+ private Timer nodeHealthScriptScheduler;
+
+ /** ShellCommandExecutor used to execute monitoring script */
+ ShellCommandExecutor shexec = null;
+
+ /** Configuration used by the checker */
+ private Configuration conf;
+
+ /** Pattern used for searching in the output of the node health script */
+ static private final String ERROR_PATTERN = "ERROR";
+
+ /** Time out error message */
+ static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";
+
+ private boolean isHealthy;
+
+ private String healthReport;
+
+ private long lastReportedTime;
+
+ private TimerTask timer;
+
+ private enum HealthCheckerExitStatus {
+ SUCCESS,
+ TIMED_OUT,
+ FAILED_WITH_EXIT_CODE,
+ FAILED_WITH_EXCEPTION,
+ FAILED
+ }
+
+
+ /**
+ * Class which is used by the {@link Timer} class to periodically execute the
+ * node health script.
+ *
+ */
+ private class NodeHealthMonitorExecutor extends TimerTask {
+
+ String exceptionStackTrace = "";
+
+ public NodeHealthMonitorExecutor(String[] args) {
+ ArrayList<String> execScript = new ArrayList<String>();
+ execScript.add(nodeHealthScript);
+ if (args != null) {
+ execScript.addAll(Arrays.asList(args));
+ }
+ shexec = new ShellCommandExecutor(execScript
+ .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+ }
+
+ @Override
+ public void run() {
+ HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+ try {
+ shexec.execute();
+ } catch (ExitCodeException e) {
+ // ignore the exit code of the script
+ status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+ } catch (Exception e) {
+ LOG.warn("Caught exception : " + e.getMessage());
+ if (!shexec.isTimedOut()) {
+ status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+ } else {
+ status = HealthCheckerExitStatus.TIMED_OUT;
+ }
+ exceptionStackTrace = StringUtils.stringifyException(e);
+ } finally {
+ if (status == HealthCheckerExitStatus.SUCCESS) {
+ if (hasErrors(shexec.getOutput())) {
+ status = HealthCheckerExitStatus.FAILED;
+ }
+ }
+ reportHealthStatus(status);
+ }
+ }
+
+ /**
+ * Method which is used to parse output from the node health monitor and
+ * send to the report address.
+ *
+ * The timed out script or script which causes IOException output is
+ * ignored.
+ *
+ * The node is marked unhealthy if
+ * <ol>
+ * <li>The node health script times out</li>
+ * <li>The node health scripts output has a line which begins with ERROR</li>
+ * <li>An exception is thrown while executing the script</li>
+ * </ol>
+ * If the script throws {@link IOException} or {@link ExitCodeException} the
+ * output is ignored and node is left remaining healthy, as script might
+ * have syntax error.
+ *
+ * @param status
+ */
+ void reportHealthStatus(HealthCheckerExitStatus status) {
+ long now = System.currentTimeMillis();
+ switch (status) {
+ case SUCCESS:
+ setHealthStatus(true, "", now);
+ break;
+ case TIMED_OUT:
+ setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
+ break;
+ case FAILED_WITH_EXCEPTION:
+ setHealthStatus(false, exceptionStackTrace);
+ break;
+ case FAILED_WITH_EXIT_CODE:
+ setHealthStatus(true, "", now);
+ break;
+ case FAILED:
+ setHealthStatus(false, shexec.getOutput());
+ break;
+ }
+ }
+
+ /**
+ * Method to check if the output string has line which begins with ERROR.
+ *
+ * @param output
+ * string
+ * @return true if output string has error pattern in it.
+ */
+ private boolean hasErrors(String output) {
+ String[] splits = output.split("\n");
+ for (String split : splits) {
+ if (split.startsWith(ERROR_PATTERN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public NodeHealthScriptRunner() {
+ super(NodeHealthScriptRunner.class.getName());
+ this.lastReportedTime = System.currentTimeMillis();
+ this.isHealthy = true;
+ this.healthReport = "";
+ }
+
+ /*
+ * Method which initializes the values for the script path and interval time.
+ */
+ @Override
+ public void init(Configuration conf) {
+ this.conf = conf;
+ this.nodeHealthScript =
+ conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
+ this.intervalTime = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
+ this.scriptTimeout = conf.getLong(
+ YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
+ String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
+ new String[] {});
+ timer = new NodeHealthMonitorExecutor(args);
+ }
+
+ /**
+ * Method used to start the Node health monitoring.
+ *
+ */
+ @Override
+ public void start() {
+ // if health script path is not configured don't start the thread.
+ if (!shouldRun(conf)) {
+ LOG.info("Not starting node health monitor");
+ return;
+ }
+ nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+ // Start the timer task immediately and
+ // then periodically at interval time.
+ nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+ }
+
+ /**
+ * Method used to terminate the node health monitoring service.
+ *
+ */
+ @Override
+ public void stop() {
+ if (!shouldRun(conf)) {
+ return;
+ }
+ nodeHealthScriptScheduler.cancel();
+ if (shexec != null) {
+ Process p = shexec.getProcess();
+ if (p != null) {
+ p.destroy();
+ }
+ }
+ }
+
+ /**
+ * Gets the if the node is healthy or not
+ *
+ * @return true if node is healthy
+ */
+ public boolean isHealthy() {
+ return isHealthy;
+ }
+
+ /**
+ * Sets if the node is healhty or not considering disks' health also.
+ *
+ * @param isHealthy
+ * if or not node is healthy
+ */
+ private synchronized void setHealthy(boolean isHealthy) {
+ this.isHealthy = isHealthy;
+ }
+
+ /**
+ * Returns output from health script. if node is healthy then an empty string
+ * is returned.
+ *
+ * @return output from health script
+ */
+ public String getHealthReport() {
+ return healthReport;
+ }
+
+ /**
+ * Sets the health report from the node health script. Also set the disks'
+ * health info obtained from DiskHealthCheckerService.
+ *
+ * @param healthReport
+ */
+ private synchronized void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+
+ /**
+ * Returns time stamp when node health script was last run.
+ *
+ * @return timestamp when node health script was last run
+ */
+ public long getLastReportedTime() {
+ return lastReportedTime;
+ }
+
+ /**
+ * Sets the last run time of the node health script.
+ *
+ * @param lastReportedTime
+ */
+ private synchronized void setLastReportedTime(long lastReportedTime) {
+ this.lastReportedTime = lastReportedTime;
+ }
+
+ /**
+ * Method used to determine if or not node health monitoring service should be
+ * started or not. Returns true if following conditions are met:
+ *
+ * <ol>
+ * <li>Path to Node health check script is not empty</li>
+ * <li>Node health check script file exists</li>
+ * </ol>
+ *
+ * @param conf
+ * @return true if node health monitoring service can be started.
+ */
+ public static boolean shouldRun(Configuration conf) {
+ String nodeHealthScript =
+ conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
+ if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+ return false;
+ }
+ File f = new File(nodeHealthScript);
+ return f.exists() && f.canExecute();
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output) {
+ this.setHealthy(isHealthy);
+ this.setHealthReport(output);
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output,
+ long time) {
+ this.setHealthStatus(isHealthy, output);
+ this.setLastReportedTime(time);
+ }
+
+ /**
+ * Used only by tests to access the timer task directly
+ * @return the timer task
+ */
+ TimerTask getTimerTask() {
+ return timer;
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Nov 29 23:17:54 2011
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentSk
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
@@ -59,6 +58,8 @@ public class NodeManager extends Composi
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
private ApplicationACLsManager aclsManager;
+ private NodeHealthCheckerService nodeHealthChecker;
+ private LocalDirsHandlerService dirsHandler;
public NodeManager() {
super(NodeManager.class.getName());
@@ -78,14 +79,16 @@ public class NodeManager extends Composi
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
- containerTokenSecretManager, ApplicationACLsManager aclsManager) {
+ containerTokenSecretManager, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
- metrics, containerTokenSecretManager, aclsManager);
+ metrics, containerTokenSecretManager, aclsManager, dirsHandler);
}
protected WebServer createWebServer(Context nmContext,
- ResourceView resourceView, ApplicationACLsManager aclsManager) {
- return new WebServer(nmContext, resourceView, aclsManager);
+ ResourceView resourceView, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
+ return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
}
protected void doSecureLogin() throws IOException {
@@ -121,16 +124,12 @@ public class NodeManager extends Composi
// NodeManager level dispatcher
AsyncDispatcher dispatcher = new AsyncDispatcher();
- NodeHealthCheckerService healthChecker = null;
- if (NodeHealthCheckerService.shouldRun(conf)) {
- healthChecker = new NodeHealthCheckerService();
- addService(healthChecker);
- }
+ nodeHealthChecker = new NodeHealthCheckerService();
+ addService(nodeHealthChecker);
+ dirsHandler = nodeHealthChecker.getDiskHandler();
- NodeStatusUpdater nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, healthChecker,
- this.containerTokenSecretManager);
-
+ NodeStatusUpdater nodeStatusUpdater = createNodeStatusUpdater(context,
+ dispatcher, nodeHealthChecker, this.containerTokenSecretManager);
nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@@ -138,11 +137,11 @@ public class NodeManager extends Composi
ContainerManagerImpl containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
- this.containerTokenSecretManager, this.aclsManager);
+ this.containerTokenSecretManager, this.aclsManager, dirsHandler);
addService(containerManager);
Service webServer = createWebServer(context, containerManager
- .getContainersMonitor(), this.aclsManager);
+ .getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
@@ -215,7 +214,14 @@ public class NodeManager extends Composi
}
}
-
+
+ /**
+ * @return the node health checker
+ */
+ public NodeHealthCheckerService getNodeHealthChecker() {
+ return nodeHealthChecker;
+ }
+
@Override
public void stateChanged(Service service) {
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Nov 29 23:17:54 2011
@@ -27,7 +27,6 @@ import java.util.Map.Entry;
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -222,11 +221,14 @@ public class NodeStatusUpdaterImpl exten
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
- if (this.healthChecker != null) {
- this.healthChecker.setHealthStatus(nodeHealthStatus);
+ nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
+ nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
+ nodeHealthStatus.setLastHealthReportTime(
+ healthChecker.getLastHealthReportTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ + ", " + nodeHealthStatus.getHealthReport());
}
- LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
- + ", " + nodeHealthStatus.getHealthReport());
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Nov 29 23:17:54 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@@ -120,7 +121,8 @@ public class ContainerManagerImpl extend
private ContainerTokenSecretManager containerTokenSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
+
+ protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
private final ApplicationACLsManager aclsManager;
@@ -129,9 +131,12 @@ public class ContainerManagerImpl extend
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, ContainerTokenSecretManager
- containerTokenSecretManager, ApplicationACLsManager aclsManager) {
+ containerTokenSecretManager, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
super(ContainerManagerImpl.class.getName());
this.context = context;
+ this.dirsHandler = dirsHandler;
+
dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;
@@ -190,9 +195,10 @@ public class ContainerManagerImpl extend
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
- deletionService);
+ deletionService, dirsHandler);
} else {
- return new NonAggregatingLogHandler(this.dispatcher, deletionService);
+ return new NonAggregatingLogHandler(this.dispatcher, deletionService,
+ dirsHandler);
}
}
@@ -203,12 +209,12 @@ public class ContainerManagerImpl extend
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
- deletionContext);
+ deletionContext, dirsHandler);
}
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
- return new ContainersLauncher(context, this.dispatcher, exec);
+ return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler);
}
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java?rev=1208131&r1=1208130&r2=1208131&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java Tue Nov 29 23:17:54 2011
@@ -22,14 +22,20 @@ import org.apache.hadoop.yarn.api.record
public class ContainerExitEvent extends ContainerEvent {
private int exitCode;
+ private final String diagnosticInfo;
public ContainerExitEvent(ContainerId cID, ContainerEventType eventType,
- int exitCode) {
+ int exitCode, String diagnosticInfo) {
super(cID, eventType);
this.exitCode = exitCode;
+ this.diagnosticInfo = diagnosticInfo;
}
public int getExitCode() {
return this.exitCode;
}
+
+ public String getDiagnosticInfo() {
+ return diagnosticInfo;
+ }
}